Merge pull request #129832 from pohly/dra-seamless-upgrade

DRA: seamless driver upgrades
This commit is contained in:
Kubernetes Prow Robot 2025-03-18 13:51:51 -07:00 committed by GitHub
commit 64621d17a6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
21 changed files with 638 additions and 110 deletions

View File

@ -43,8 +43,8 @@ func (s *server) RegisterPlugin(pluginName string, endpoint string, versions []s
return s.connectClient(pluginName, endpoint)
}
func (s *server) DeRegisterPlugin(pluginName string) {
klog.V(2).InfoS("Deregistering plugin", "plugin", pluginName)
func (s *server) DeRegisterPlugin(pluginName, endpoint string) {
klog.V(2).InfoS("Deregistering plugin", "plugin", pluginName, "endpoint", endpoint)
client := s.getClient(pluginName)
if client != nil {
s.disconnectClient(pluginName, client)
@ -88,7 +88,6 @@ func (s *server) disconnectClient(name string, c Client) error {
s.deregisterClient(name)
return c.Disconnect()
}
func (s *server) registerClient(name string, c Client) {
s.mutex.Lock()
defer s.mutex.Unlock()

View File

@ -98,7 +98,20 @@ func NewManagerImpl(kubeClient clientset.Interface, stateFileDirectory string, n
}
func (m *ManagerImpl) GetWatcherHandler() cache.PluginHandler {
return cache.PluginHandler(dra.NewRegistrationHandler(m.kubeClient, m.getNode))
// The time that DRA drivers have to come back after being unregistered
// before the kubelet removes their ResourceSlices.
//
// This must be long enough to actually allow stopping a pod and
// starting the replacement (otherwise ResourceSlices get deleted
// unnecessarily) and not too long (otherwise the time window were
// pods might still get scheduled to the node after removal of a
// driver is too long).
//
// 30 seconds might be long enough for a simple container restart.
// If a DRA driver wants to be sure that slices don't get wiped,
// it should use rolling updates.
wipingDelay := 30 * time.Second
return cache.PluginHandler(dra.NewRegistrationHandler(m.kubeClient, m.getNode, wipingDelay))
}
// Start starts the reconcile loop of the manager.

View File

@ -580,11 +580,11 @@ func TestPrepareResources(t *testing.T) {
}
defer draServerInfo.teardownFn()
plg := plugin.NewRegistrationHandler(nil, getFakeNode)
plg := plugin.NewRegistrationHandler(nil, getFakeNode, time.Second /* very short wiping delay for testing */)
if err := plg.RegisterPlugin(test.driverName, draServerInfo.socketName, []string{drapb.DRAPluginService}, pluginClientTimeout); err != nil {
t.Fatalf("failed to register plugin %s, err: %v", test.driverName, err)
}
defer plg.DeRegisterPlugin(test.driverName) // for sake of next tests
defer plg.DeRegisterPlugin(test.driverName, draServerInfo.socketName) // for sake of next tests
if test.claimInfo != nil {
manager.cache.add(test.claimInfo)
@ -717,11 +717,11 @@ func TestUnprepareResources(t *testing.T) {
}
defer draServerInfo.teardownFn()
plg := plugin.NewRegistrationHandler(nil, getFakeNode)
plg := plugin.NewRegistrationHandler(nil, getFakeNode, time.Second /* very short wiping delay for testing */)
if err := plg.RegisterPlugin(test.driverName, draServerInfo.socketName, []string{drapb.DRAPluginService}, pluginClientTimeout); err != nil {
t.Fatalf("failed to register plugin %s, err: %v", test.driverName, err)
}
defer plg.DeRegisterPlugin(test.driverName) // for sake of next tests
defer plg.DeRegisterPlugin(test.driverName, draServerInfo.socketName) // for sake of next tests
manager := &ManagerImpl{
kubeClient: fakeKubeClient,
@ -887,11 +887,11 @@ func TestParallelPrepareUnprepareResources(t *testing.T) {
}
defer draServerInfo.teardownFn()
plg := plugin.NewRegistrationHandler(nil, getFakeNode)
plg := plugin.NewRegistrationHandler(nil, getFakeNode, time.Second /* very short wiping delay for testing */)
if err := plg.RegisterPlugin(driverName, draServerInfo.socketName, []string{drapb.DRAPluginService}, nil); err != nil {
t.Fatalf("failed to register plugin %s, err: %v", driverName, err)
}
defer plg.DeRegisterPlugin(driverName)
defer plg.DeRegisterPlugin(driverName, draServerInfo.socketName)
// Create ClaimInfo cache
cache, err := newClaimInfoCache(t.TempDir(), draManagerStateFileName)

View File

@ -136,7 +136,7 @@ func TestGRPCConnIsReused(t *testing.T) {
// ensure the plugin we are using is registered
draPlugins.add(p)
defer draPlugins.delete(pluginName)
defer draPlugins.remove(pluginName, addr)
// we call `NodePrepareResource` 2 times and check whether a new connection is created or the same is reused
for i := 0; i < 2; i++ {
@ -210,7 +210,7 @@ func TestNewDRAPluginClient(t *testing.T) {
setup: func(name string) tearDown {
draPlugins.add(&Plugin{name: name})
return func() {
draPlugins.delete(name)
draPlugins.remove(name, "")
}
},
pluginName: "dummy-plugin",
@ -298,7 +298,7 @@ func TestGRPCMethods(t *testing.T) {
}
draPlugins.add(p)
defer draPlugins.delete(pluginName)
defer draPlugins.remove(pluginName, addr)
client, err := NewDRAPluginClient(pluginName)
if err != nil {

View File

@ -18,13 +18,16 @@ package plugin
import (
"errors"
"fmt"
"slices"
"sync"
)
// PluginsStore holds a list of DRA Plugins.
type pluginsStore struct {
sync.RWMutex
store map[string]*Plugin
// plugin name -> Plugin in the order in which they got added
store map[string][]*Plugin
}
// draPlugins map keeps track of all registered DRA plugins on the node
@ -37,43 +40,57 @@ func (s *pluginsStore) get(pluginName string) *Plugin {
s.RLock()
defer s.RUnlock()
return s.store[pluginName]
instances := s.store[pluginName]
if len(instances) == 0 {
return nil
}
// Heuristic: pick the most recent one. It's most likely
// the newest, except when kubelet got restarted and registered
// all running plugins in random order.
return instances[len(instances)-1]
}
// Set lets you save a DRA Plugin to the list and give it a specific name.
// This method is protected by a mutex.
func (s *pluginsStore) add(p *Plugin) (replacedPlugin *Plugin, replaced bool) {
func (s *pluginsStore) add(p *Plugin) error {
s.Lock()
defer s.Unlock()
if s.store == nil {
s.store = make(map[string]*Plugin)
s.store = make(map[string][]*Plugin)
}
replacedPlugin, exists := s.store[p.name]
s.store[p.name] = p
if replacedPlugin != nil && replacedPlugin.cancel != nil {
replacedPlugin.cancel(errors.New("plugin got replaced"))
for _, oldP := range s.store[p.name] {
if oldP.endpoint == p.endpoint {
// One plugin instance cannot hijack the endpoint of another instance.
return fmt.Errorf("endpoint %s already registered for plugin %s", p.endpoint, p.name)
}
}
return replacedPlugin, exists
s.store[p.name] = append(s.store[p.name], p)
return nil
}
// Delete lets you delete a DRA Plugin by name.
// This method is protected by a mutex.
func (s *pluginsStore) delete(pluginName string) *Plugin {
// remove lets you remove one endpoint for a DRA Plugin.
// This method is protected by a mutex. It returns the
// plugin if found and true if that was the last instance
func (s *pluginsStore) remove(pluginName, endpoint string) (*Plugin, bool) {
s.Lock()
defer s.Unlock()
p, exists := s.store[pluginName]
if !exists {
return nil
instances := s.store[pluginName]
i := slices.IndexFunc(instances, func(p *Plugin) bool { return p.endpoint == endpoint })
if i == -1 {
return nil, false
}
p := instances[i]
last := len(instances) == 1
if last {
delete(s.store, pluginName)
} else {
s.store[pluginName] = slices.Delete(instances, i, i+1)
}
if p.cancel != nil {
p.cancel(errors.New("plugin got removed"))
}
delete(s.store, pluginName)
return p
return p, last
}

View File

@ -22,6 +22,7 @@ import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestAddSameName(t *testing.T) {
@ -30,26 +31,35 @@ func TestAddSameName(t *testing.T) {
firstWasCancelled := false
p := &Plugin{
name: pluginName,
cancel: func(err error) { firstWasCancelled = true },
name: pluginName,
endpoint: "old",
cancel: func(err error) { firstWasCancelled = true },
}
// ensure the plugin we are using is registered
draPlugins.add(p)
defer draPlugins.delete(p.name)
require.NoError(t, draPlugins.add(p))
defer draPlugins.remove(p.name, p.endpoint)
assert.False(t, firstWasCancelled, "should not cancel context after the first call")
// Same name, same endpoint -> error.
require.Error(t, draPlugins.add(p))
secondWasCancelled := false
p2 := &Plugin{
name: pluginName,
cancel: func(err error) { secondWasCancelled = true },
name: pluginName,
endpoint: "new",
cancel: func(err error) { secondWasCancelled = true },
}
require.NoError(t, draPlugins.add(p2))
defer draPlugins.remove(p2.name, p2.endpoint)
draPlugins.add(p2)
defer draPlugins.delete(p2.name)
assert.False(t, firstWasCancelled, "should not cancel context after registering the second instance")
assert.False(t, secondWasCancelled, "should not cancel context of a new plugin")
assert.True(t, firstWasCancelled, "should cancel context after the second call")
// Remove old plugin.
draPlugins.remove(p.name, p.endpoint)
assert.True(t, firstWasCancelled, "should have canceled context after the explicit removal")
assert.False(t, secondWasCancelled, "should not cancel context of a new plugin")
}
@ -65,7 +75,7 @@ func TestDelete(t *testing.T) {
// ensure the plugin we are using is registered
draPlugins.add(p)
draPlugins.delete(p.name)
draPlugins.remove(p.name, "")
assert.True(t, wasCancelled, "should cancel context after the second call")
}

View File

@ -21,6 +21,7 @@ import (
"errors"
"fmt"
"slices"
"sync"
"time"
v1 "k8s.io/api/core/v1"
@ -53,6 +54,18 @@ type RegistrationHandler struct {
backgroundCtx context.Context
kubeClient kubernetes.Interface
getNode func() (*v1.Node, error)
wipingDelay time.Duration
mutex sync.Mutex
// pendingWipes maps a plugin name to a cancel function for
// wiping of that plugin's ResourceSlices. Entries get added
// in DeRegisterPlugin and check in RegisterPlugin. If
// wiping is pending during RegisterPlugin, it gets canceled.
//
// Must use pointers to functions because the entries have to
// be comparable.
pendingWipes map[string]*context.CancelCauseFunc
}
var _ cache.PluginHandler = &RegistrationHandler{}
@ -62,12 +75,14 @@ var _ cache.PluginHandler = &RegistrationHandler{}
// Must only be called once per process because it manages global state.
// If a kubeClient is provided, then it synchronizes ResourceSlices
// with the resource information provided by plugins.
func NewRegistrationHandler(kubeClient kubernetes.Interface, getNode func() (*v1.Node, error)) *RegistrationHandler {
func NewRegistrationHandler(kubeClient kubernetes.Interface, getNode func() (*v1.Node, error), wipingDelay time.Duration) *RegistrationHandler {
handler := &RegistrationHandler{
// The context and thus logger should come from the caller.
backgroundCtx: klog.NewContext(context.TODO(), klog.LoggerWithName(klog.TODO(), "DRA registration handler")),
kubeClient: kubeClient,
getNode: getNode,
wipingDelay: wipingDelay,
pendingWipes: make(map[string]*context.CancelCauseFunc),
}
// When kubelet starts up, no DRA driver has registered yet. None of
@ -77,19 +92,34 @@ func NewRegistrationHandler(kubeClient kubernetes.Interface, getNode func() (*v1
// to start up.
//
// This has to run in the background.
go handler.wipeResourceSlices("")
logger := klog.LoggerWithName(klog.FromContext(handler.backgroundCtx), "startup")
ctx := klog.NewContext(handler.backgroundCtx, logger)
go handler.wipeResourceSlices(ctx, 0 /* no delay */, "" /* all drivers */)
return handler
}
// wipeResourceSlices deletes ResourceSlices of the node, optionally just for a specific driver.
func (h *RegistrationHandler) wipeResourceSlices(driver string) {
// Wiping will delay for a while and can be canceled by canceling the context.
func (h *RegistrationHandler) wipeResourceSlices(ctx context.Context, delay time.Duration, driver string) {
if h.kubeClient == nil {
return
}
ctx := h.backgroundCtx
logger := klog.FromContext(ctx)
if delay != 0 {
// Before we start deleting, give the driver time to bounce back.
// Perhaps it got removed as part of a DaemonSet update and the
// replacement pod is about to start.
logger.V(4).Info("Starting to wait before wiping ResourceSlices", "delay", delay)
select {
case <-ctx.Done():
logger.V(4).Info("Aborting wiping of ResourceSlices", "reason", context.Cause(ctx))
case <-time.After(delay):
logger.V(4).Info("Starting to wipe ResourceSlices after waiting", "delay", delay)
}
}
backoff := wait.Backoff{
Duration: time.Second,
Factor: 2,
@ -148,10 +178,10 @@ func (h *RegistrationHandler) RegisterPlugin(pluginName string, endpoint string,
// into all log output related to the plugin.
ctx := h.backgroundCtx
logger := klog.FromContext(ctx)
logger = klog.LoggerWithValues(logger, "pluginName", pluginName)
logger = klog.LoggerWithValues(logger, "pluginName", pluginName, "endpoint", endpoint)
ctx = klog.NewContext(ctx, logger)
logger.V(3).Info("Register new DRA plugin", "endpoint", endpoint)
logger.V(3).Info("Register new DRA plugin")
chosenService, err := h.validateSupportedServices(pluginName, supportedServices)
if err != nil {
@ -179,9 +209,19 @@ func (h *RegistrationHandler) RegisterPlugin(pluginName string, endpoint string,
// Storing endpoint of newly registered DRA Plugin into the map, where plugin name will be the key
// all other DRA components will be able to get the actual socket of DRA plugins by its name.
if err := draPlugins.add(pluginInstance); err != nil {
cancel(err)
// No wrapping, the error already contains details.
return err
}
if oldPlugin, replaced := draPlugins.add(pluginInstance); replaced {
logger.V(1).Info("DRA plugin already registered, the old plugin was replaced and will be forgotten by the kubelet till the next kubelet restart", "oldEndpoint", oldPlugin.endpoint)
// Now cancel any pending ResourceSlice wiping for this plugin.
// Only needs to be done once.
h.mutex.Lock()
defer h.mutex.Unlock()
if cancel := h.pendingWipes[pluginName]; cancel != nil {
(*cancel)(errors.New("new plugin instance registered"))
delete(h.pendingWipes, pluginName)
}
return nil
@ -220,16 +260,51 @@ func (h *RegistrationHandler) validateSupportedServices(pluginName string, suppo
// DeRegisterPlugin is called when a plugin has removed its socket,
// signaling it is no longer available.
func (h *RegistrationHandler) DeRegisterPlugin(pluginName string) {
if p := draPlugins.delete(pluginName); p != nil {
func (h *RegistrationHandler) DeRegisterPlugin(pluginName, endpoint string) {
if p, last := draPlugins.remove(pluginName, endpoint); p != nil {
// This logger includes endpoint and pluginName.
logger := klog.FromContext(p.backgroundCtx)
logger.V(3).Info("Deregister DRA plugin", "endpoint", p.endpoint)
logger.V(3).Info("Deregister DRA plugin", "lastInstance", last)
if !last {
return
}
// Prepare for canceling the background wiping. This needs to run
// in the context of the registration handler, the one from
// the plugin is canceled.
logger = klog.FromContext(h.backgroundCtx)
logger = klog.LoggerWithName(logger, "driver-cleanup")
logger = klog.LoggerWithValues(logger, "pluginName", pluginName)
ctx, cancel := context.WithCancelCause(h.backgroundCtx)
ctx = klog.NewContext(ctx, logger)
// Clean up the ResourceSlices for the deleted Plugin since it
// may have died without doing so itself and might never come
// back.
go h.wipeResourceSlices(pluginName)
//
// May get canceled if the plugin comes back quickly enough
// (see RegisterPlugin).
h.mutex.Lock()
defer h.mutex.Unlock()
if cancel := h.pendingWipes[pluginName]; cancel != nil {
(*cancel)(errors.New("plugin deregistered a second time"))
}
h.pendingWipes[pluginName] = &cancel
go func() {
defer func() {
h.mutex.Lock()
defer h.mutex.Unlock()
// Cancel our own context, but remove it from the map only if it
// is the current entry. Perhaps it already got replaced.
cancel(errors.New("wiping done"))
if h.pendingWipes[pluginName] == &cancel {
delete(h.pendingWipes, pluginName)
}
}()
h.wipeResourceSlices(ctx, h.wipingDelay, pluginName)
}()
return
}

View File

@ -157,7 +157,7 @@ func TestRegistrationHandler(t *testing.T) {
}
// The handler wipes all slices at startup.
handler := NewRegistrationHandler(client, getFakeNode)
handler := NewRegistrationHandler(client, getFakeNode, time.Second /* very short wiping delay for testing */)
requireNoSlices := func() {
t.Helper()
if client == nil {
@ -176,6 +176,10 @@ func TestRegistrationHandler(t *testing.T) {
// Simulate one existing plugin A.
err := handler.RegisterPlugin(pluginA, endpointA, []string{drapb.DRAPluginService}, nil)
require.NoError(t, err)
t.Cleanup(func() {
tCtx.Logf("Removing plugin %s", pluginA)
handler.DeRegisterPlugin(pluginA, endpointA)
})
err = handler.ValidatePlugin(test.pluginName, test.endpoint, test.supportedServices)
if test.shouldError {
@ -206,9 +210,10 @@ func TestRegistrationHandler(t *testing.T) {
assert.NoError(t, err, "recreate slice")
}
handler.DeRegisterPlugin(test.pluginName)
tCtx.Logf("Removing plugin %s", test.pluginName)
handler.DeRegisterPlugin(test.pluginName, test.endpoint)
// Nop.
handler.DeRegisterPlugin(test.pluginName)
handler.DeRegisterPlugin(test.pluginName, test.endpoint)
requireNoSlices()
})

View File

@ -89,6 +89,7 @@ type PluginInfo struct {
UUID types.UID
Handler PluginHandler
Name string
Endpoint string
}
func (asw *actualStateOfWorld) AddPlugin(pluginInfo PluginInfo) error {

View File

@ -56,5 +56,5 @@ type PluginHandler interface {
RegisterPlugin(pluginName, endpoint string, versions []string, pluginClientTimeout *time.Duration) error
// DeRegisterPlugin is called once the pluginwatcher observes that the socket has
// been deleted.
DeRegisterPlugin(pluginName string)
DeRegisterPlugin(pluginName, endpoint string)
}

View File

@ -118,6 +118,7 @@ func (og *operationGenerator) GenerateRegisterPluginFunc(
UUID: pluginUUID,
Handler: handler,
Name: infoResp.Name,
Endpoint: infoResp.Endpoint,
})
if err != nil {
klog.ErrorS(err, "RegisterPlugin error -- failed to add plugin", "path", socketPath)
@ -147,7 +148,7 @@ func (og *operationGenerator) GenerateUnregisterPluginFunc(
// so that if we receive a register event during Register Plugin, we can process it as a Register call.
actualStateOfWorldUpdater.RemovePlugin(pluginInfo.SocketPath)
pluginInfo.Handler.DeRegisterPlugin(pluginInfo.Name)
pluginInfo.Handler.DeRegisterPlugin(pluginInfo.Name, pluginInfo.Endpoint)
klog.V(4).InfoS("DeRegisterPlugin called", "pluginName", pluginInfo.Name, "pluginHandler", pluginInfo.Handler)
return nil

View File

@ -54,7 +54,7 @@ func newFakePluginHandler() *fakePluginHandler {
func (f *fakePluginHandler) ValidatePlugin(pluginName string, endpoint string, versions []string) error {
f.Lock()
defer f.Unlock()
f.events = append(f.events, "validate "+pluginName)
f.events = append(f.events, "validate "+pluginName+" "+endpoint)
return nil
}
@ -62,15 +62,15 @@ func (f *fakePluginHandler) ValidatePlugin(pluginName string, endpoint string, v
func (f *fakePluginHandler) RegisterPlugin(pluginName, endpoint string, versions []string, pluginClientTimeout *time.Duration) error {
f.Lock()
defer f.Unlock()
f.events = append(f.events, "register "+pluginName)
f.events = append(f.events, "register "+pluginName+" "+endpoint)
return nil
}
// DeRegisterPlugin is a fake method
func (f *fakePluginHandler) DeRegisterPlugin(pluginName string) {
func (f *fakePluginHandler) DeRegisterPlugin(pluginName, endpoint string) {
f.Lock()
defer f.Unlock()
f.events = append(f.events, "deregister "+pluginName)
f.events = append(f.events, "deregister "+pluginName+" "+endpoint)
}
func (f *fakePluginHandler) Reset() {
@ -93,8 +93,24 @@ func cleanup(t *testing.T) {
os.MkdirAll(socketDir, 0755)
}
func waitForRegistration(t *testing.T, fakePluginHandler *fakePluginHandler, pluginName string) {
expected := []string{"validate " + pluginName, "register " + pluginName}
func waitForRegistration(t *testing.T, fakePluginHandler *fakePluginHandler, pluginName, endpoint string) {
t.Helper()
waitFor(t, fakePluginHandler,
[]string{"validate " + pluginName + " " + endpoint, "register " + pluginName + " " + endpoint},
"Timed out waiting for plugin to be added to actual state of world cache.",
)
}
func waitForDeRegistration(t *testing.T, fakePluginHandler *fakePluginHandler, pluginName, endpoint string) {
t.Helper()
waitFor(t, fakePluginHandler,
[]string{"deregister " + pluginName + " " + endpoint},
"Timed out waiting for plugin to be removed from actual state of the world cache.",
)
}
func waitFor(t *testing.T, fakePluginHandler *fakePluginHandler, expected []string, what string) {
t.Helper()
err := retryWithExponentialBackOff(
100*time.Millisecond,
func() (bool, error) {
@ -108,7 +124,7 @@ func waitForRegistration(t *testing.T, fakePluginHandler *fakePluginHandler, plu
},
)
if err != nil {
t.Fatalf("Timed out waiting for plugin to be added to actual state of world cache.")
t.Fatal(what)
}
}
@ -122,7 +138,7 @@ func retryWithExponentialBackOff(initialDuration time.Duration, fn wait.Conditio
return wait.ExponentialBackoff(backoff, fn)
}
func TestPluginRegistration(t *testing.T) {
func TestPluginManager(t *testing.T) {
defer cleanup(t)
pluginManager := newTestPluginManager(socketDir)
@ -157,7 +173,12 @@ func TestPluginRegistration(t *testing.T) {
require.NoError(t, p.Serve("v1beta1", "v1beta2"))
// Verify that the plugin is registered
waitForRegistration(t, fakeHandler, pluginName)
waitForRegistration(t, fakeHandler, pluginName, socketPath)
// And unregister.
fakeHandler.Reset()
require.NoError(t, p.Stop())
waitForDeRegistration(t, fakeHandler, pluginName, socketPath)
}
}

View File

@ -16,6 +16,92 @@ there.
This socket filename should not start with a '.' as it will be ignored.
To avoid conflicts between different plugins, the recommendation is to use
`<plugin name>[-<some optional string>].sock` as filename. `<plugin name>`
should end with a DNS domain that is unique for the plugin. Each time a plugin
starts, it has to delete old sockets if they exist and listen anew under the
same filename.
## Seamless Upgrade
To avoid downtime of a plugin on a node, it would be nice to support running an
old plugin in parallel to the new plugin. When deploying with a DaemonSet,
setting `maxSurge` to a value larger than zero enables such a seamless upgrade.
**Warning**: Such a seamless upgrade is only supported for DRA at the moment.
### In a plugin
*Note*: For DRA, the
[k8s.io/dynamic-resource-allocation](https://pkg.go.dev/k8s.io/dynamic-resource-allocation/kubeletplugin)
helper package offers the `RollingUpdate` option which implements the socket
handling as described in this section.
To support seamless upgrades, each plugin instance must use a unique
socket filename. Otherwise the following could happen:
- The old instance is registered with `plugin.example.com-reg.sock`.
- The new instance starts, unlinks that file, and starts listening on it again.
- In parallel, the kubelet notices the removal and unregisters the plugin
before probing the new instance, thus breaking the seamless upgrade.
Even if the timing is more favorable and unregistration is avoided, using the
same socket is problematic: if the new instance fails, the kubelet cannot fall
back to the old instance because that old instance is not listening to the
socket that is available under `plugin.example.com-reg.sock`.
This can be achieved in a DaemonSet by passing the UID of the pod into the pod
through the downward API. New instances may try to clean up stale sockets of
older instances, but have to be absolutely sure that those sockets really
aren't in use anymore. Each instance should catch termination signals and clean
up after itself. Then sockets only leak during abnormal events (power loss,
killing with SIGKILL).
Last but not least, both plugin instances must be usable in parallel. It is not
predictable which instance the kubelet will use for which request.
### In the kubelet
For such a seamless upgrade with different sockets per plugin to work reliably,
the handler for the plugin type must track all registered instances. Then if
one of them fails and gets unregistered, it can fall back to some
other. Picking the most recently registered instance is a good heuristic. This
isn't perfect because after a kubelet restart, plugin instances get registered
in a random order. Restarting the kubelet in the middle of an upgrade should be
rare.
At the moment, the following handlers do not support such seamless upgrades:
- The device plugin handler suffers from temporarily removing the extended
resources during an upgrade. A proposed fix is pending in
https://github.com/kubernetes/kubernetes/pull/127821.
- The CSI handler [tries to determine which instance is newer](https://github.com/kubernetes/kubernetes/blob/7140b4910c6c1179c9778a7f3bb8037356febd58/pkg/volume/csi/csi_plugin.go#L115-L125) based on the supported version(s) and
only remembers that one. If that newest instance fails, there is no fallback.
In practice, most CSI drivers probably all pass [the hard-coded "1.0.0"](https://github.com/kubernetes-csi/node-driver-registrar/blob/27700e2962cd35b9f2336a156146181e5c75399e/cmd/csi-node-driver-registrar/main.go#L72)
from the csi-node-registrar as supported version, so this version
selection mechanism isn't used at all.
This supports it:
- DRA
### Deployment
Deploying a plugin with support for seamless upgrades and per-instance socket
filenames is *not* compatible with a kubelet version that does not have support
for seamless upgrades yet. It breaks like this:
- New instance starts, gets registered and replaces the old one.
- Old instance stops, removing its socket.
- The kubelet notices that, unregisters the plugin.
- The plugin handler removes *the new* instance because it ignores the socket path -> no instance left.
Plugin authors either have to assume that the cluster has a recent enough
kubelet or rely on labeling nodes with support. Then the plugin can use one
simple DaemonSet for nodes without support and another, more complex one where
`maxSurge` is increased to enable seamless upgrades on nodes which support it.
No such label is specified at the moment.
## gRPC Service Lifecycle

View File

@ -132,7 +132,7 @@ func (d *DummyImpl) RegisterPlugin(pluginName string, endpoint string, versions
}
// DeRegisterPlugin is a dummy implementation
func (d *DummyImpl) DeRegisterPlugin(pluginName string) {
func (d *DummyImpl) DeRegisterPlugin(pluginName, endpoint string) {
}
// Calls Run()

View File

@ -265,8 +265,8 @@ func (h *RegistrationHandler) validateVersions(callerName, pluginName string, en
// DeRegisterPlugin is called when a plugin removed its socket, signaling
// it is no longer available
func (h *RegistrationHandler) DeRegisterPlugin(pluginName string) {
klog.Info(log("registrationHandler.DeRegisterPlugin request for plugin %s", pluginName))
func (h *RegistrationHandler) DeRegisterPlugin(pluginName, endpoint string) {
klog.Info(log("registrationHandler.DeRegisterPlugin request for plugin %s, endpoint %s", pluginName, endpoint))
if err := unregisterDriver(pluginName); err != nil {
klog.Error(log("registrationHandler.DeRegisterPlugin failed: %v", err))
}

View File

@ -12,6 +12,7 @@ require (
github.com/google/go-cmp v0.7.0
github.com/onsi/gomega v1.35.1
github.com/stretchr/testify v1.10.0
go.etcd.io/etcd/client/pkg/v3 v3.5.16
google.golang.org/grpc v1.68.1
k8s.io/api v0.0.0
k8s.io/apimachinery v0.0.0
@ -57,6 +58,8 @@ require (
github.com/x448/float16 v0.8.4 // indirect
go.opentelemetry.io/otel v1.33.0 // indirect
go.opentelemetry.io/otel/trace v1.33.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.27.0 // indirect
golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 // indirect
golang.org/x/net v0.33.0 // indirect
golang.org/x/oauth2 v0.27.0 // indirect

View File

@ -158,6 +158,7 @@ github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
go.etcd.io/bbolt v1.3.11/go.mod h1:dksAq7YMXoljX0xu6VF5DMZGbhYYoLUalEiSySYAS4I=
go.etcd.io/etcd/api/v3 v3.5.16/go.mod h1:1P4SlIP/VwkDmGo3OlOD7faPeP8KDIFhqvciH5EfN28=
go.etcd.io/etcd/client/pkg/v3 v3.5.16 h1:ZgY48uH6UvB+/7R9Yf4x574uCO3jIx0TRDyetSfId3Q=
go.etcd.io/etcd/client/pkg/v3 v3.5.16/go.mod h1:V8acl8pcEK0Y2g19YlOV9m9ssUe6MgiDSobSoaBAM0E=
go.etcd.io/etcd/client/v2 v2.305.16/go.mod h1:h9YxWCzcdvZENbfzBTFCnoNumr2ax3F19sKMqHFmXHE=
go.etcd.io/etcd/client/v3 v3.5.16/go.mod h1:X+rExSGkyqxvu276cr2OwPLBaeqFu1cIl4vmRjAD/50=
@ -176,9 +177,12 @@ go.opentelemetry.io/otel/sdk v1.33.0/go.mod h1:A1Q5oi7/9XaMlIWzPSxLRWOI8nG3FnzHJ
go.opentelemetry.io/otel/trace v1.33.0 h1:cCJuF7LRjUFso9LPnEAHJDB2pqzp+hbO8eu1qqW2d/s=
go.opentelemetry.io/otel/trace v1.33.0/go.mod h1:uIcdVUZMpTAmz0tI1z04GoVSezK37CbGV4fr1f2nBck=
go.opentelemetry.io/proto/otlp v1.4.0/go.mod h1:PPBWZIP98o2ElSqI35IHfu7hIhSwvc5N38Jw8pXuGFY=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8=
go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=

View File

@ -21,12 +21,14 @@ import (
"errors"
"fmt"
"net"
"os"
"path"
"sync"
"google.golang.org/grpc"
"k8s.io/klog/v2"
"go.etcd.io/etcd/client/pkg/v3/fileutil"
resourceapi "k8s.io/api/resource/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
@ -195,8 +197,10 @@ func RegistrarDirectoryPath(path string) Option {
// support updates from an installation which used an older release of
// of the helper code.
//
// The default is <driver name>-reg.sock. When rolling updates are enabled (not supported yet),
// The default is <driver name>-reg.sock. When rolling updates are enabled,
// it is <driver name>-<uid>-reg.sock.
//
// This option and [RollingUpdate] are mutually exclusive.
func RegistrarSocketFilename(name string) Option {
return func(o *options) error {
o.pluginRegistrationEndpoint.file = name
@ -248,6 +252,44 @@ func PluginListener(listen func(ctx context.Context, path string) (net.Listener,
}
}
// RollingUpdate can be used to enable support for running two plugin instances
// in parallel while a newer instance replaces the older. When enabled, both
// instances must share the same plugin data directory and driver name.
// They create different sockets to allow the kubelet to connect to both at
// the same time.
//
// There is no guarantee which of the two instances are used by kubelet.
// For example, it can happen that a claim gets prepared by one instance
// and then needs to be unprepared by the other. Kubelet then may fall back
// to the first one again for some other operation. In practice this means
// that each instance must be entirely stateless across method calls.
// Serialization (on by default, see [Serialize]) ensures that methods
// are serialized across all instances through file locking. The plugin
// implementation can load shared state from a file at the start
// of a call, execute and then store the updated shared state again.
//
// Passing a non-empty uid enables rolling updates, an empty uid disables it.
// The uid must be the pod UID. A DaemonSet can pass that into the driver container
// via the downward API (https://kubernetes.io/docs/concepts/workloads/pods/downward-api/#downwardapi-fieldRef).
//
// Because new instances cannot remove stale sockets of older instances,
// it is important that each pod shuts down cleanly: it must catch SIGINT/TERM
// and stop the helper instead of quitting immediately.
//
// This depends on support in the kubelet which was added in Kubernetes 1.33.
// Don't use this if it is not certain that the kubelet has that support!
//
// This option and [RegistrarSocketFilename] are mutually exclusive.
func RollingUpdate(uid types.UID) Option {
return func(o *options) error {
o.rollingUpdateUID = uid
// TODO: ask the kubelet whether that pod is still running and
// clean up leftover sockets?
return nil
}
}
// GRPCInterceptor is called for each incoming gRPC method call. This option
// may be used more than once and each interceptor will get called.
func GRPCInterceptor(interceptor grpc.UnaryServerInterceptor) Option {
@ -322,6 +364,17 @@ func Serialize(enabled bool) Option {
}
}
// FlockDir changes where lock files are created and locked. A lock file
// is needed when serializing gRPC calls and rolling updates are enabled.
// The directory must exist and be reserved for exclusive use by the
// driver. The default is the plugin data directory.
func FlockDirectoryPath(path string) Option {
return func(o *options) error {
o.flockDirectoryPath = path
return nil
}
}
type options struct {
logger klog.Logger
grpcVerbosity int
@ -330,11 +383,13 @@ type options struct {
nodeUID types.UID
pluginRegistrationEndpoint endpoint
pluginDataDirectoryPath string
rollingUpdateUID types.UID
draEndpointListen func(ctx context.Context, path string) (net.Listener, error)
unaryInterceptors []grpc.UnaryServerInterceptor
streamInterceptors []grpc.StreamServerInterceptor
kubeClient kubernetes.Interface
serialize bool
flockDirectoryPath string
nodeV1beta1 bool
}
@ -344,17 +399,18 @@ type Helper struct {
// backgroundCtx is for activities that are started later.
backgroundCtx context.Context
// cancel cancels the backgroundCtx.
cancel func(cause error)
wg sync.WaitGroup
registrar *nodeRegistrar
pluginServer *grpcServer
plugin DRAPlugin
driverName string
nodeName string
nodeUID types.UID
kubeClient kubernetes.Interface
serialize bool
grpcMutex sync.Mutex
cancel func(cause error)
wg sync.WaitGroup
registrar *nodeRegistrar
pluginServer *grpcServer
plugin DRAPlugin
driverName string
nodeName string
nodeUID types.UID
kubeClient kubernetes.Interface
serialize bool
grpcMutex sync.Mutex
grpcLockFilePath string
// Information about resource publishing changes concurrently and thus
// must be protected by the mutex. The controller gets started only
@ -392,12 +448,20 @@ func Start(ctx context.Context, plugin DRAPlugin, opts ...Option) (result *Helpe
if o.driverName == "" {
return nil, errors.New("driver name must be set")
}
if o.rollingUpdateUID != "" && o.pluginRegistrationEndpoint.file != "" {
return nil, errors.New("rolling updates and explicit registration socket filename are mutually exclusive")
}
uidPart := ""
if o.rollingUpdateUID != "" {
uidPart = "-" + string(o.rollingUpdateUID)
}
if o.pluginRegistrationEndpoint.file == "" {
o.pluginRegistrationEndpoint.file = o.driverName + "-reg.sock"
o.pluginRegistrationEndpoint.file = o.driverName + uidPart + "-reg.sock"
}
if o.pluginDataDirectoryPath == "" {
o.pluginDataDirectoryPath = path.Join(KubeletPluginsDir, o.driverName)
}
d := &Helper{
driverName: o.driverName,
nodeName: o.nodeName,
@ -406,6 +470,14 @@ func Start(ctx context.Context, plugin DRAPlugin, opts ...Option) (result *Helpe
serialize: o.serialize,
plugin: plugin,
}
if o.rollingUpdateUID != "" {
dir := o.pluginDataDirectoryPath
if o.flockDirectoryPath != "" {
dir = o.flockDirectoryPath
}
// Enable file locking, required for concurrently running pods.
d.grpcLockFilePath = path.Join(dir, "serialize.lock")
}
// Stop calls cancel and therefore both cancellation
// and Stop cause goroutines to stop.
@ -434,7 +506,7 @@ func Start(ctx context.Context, plugin DRAPlugin, opts ...Option) (result *Helpe
var supportedServices []string
draEndpoint := endpoint{
dir: o.pluginDataDirectoryPath,
file: "dra.sock", // "dra" is hard-coded.
file: "dra" + uidPart + ".sock", // "dra" is hard-coded. The directory is unique, so we get a unique full path also without the UID.
listenFunc: o.draEndpointListen,
}
pluginServer, err := startGRPCServer(klog.LoggerWithName(logger, "dra"), o.grpcVerbosity, o.unaryInterceptors, o.streamInterceptors, draEndpoint, func(grpcServer *grpc.Server) {
@ -575,12 +647,25 @@ func (d *Helper) RegistrationStatus() *registerapi.RegistrationStatus {
// serializeGRPCIfEnabled locks a mutex if serialization is enabled.
// Either way it returns a method that the caller must invoke
// via defer.
func (d *Helper) serializeGRPCIfEnabled() func() {
func (d *Helper) serializeGRPCIfEnabled() (func(), error) {
if !d.serialize {
return func() {}
return func() {}, nil
}
// If rolling updates are enabled, we cannot do only in-memory locking.
// We must use file locking.
if d.grpcLockFilePath != "" {
file, err := fileutil.LockFile(d.grpcLockFilePath, os.O_RDWR|os.O_CREATE, 0666)
if err != nil {
return nil, fmt.Errorf("lock file: %w", err)
}
return func() {
_ = file.Close()
}, nil
}
d.grpcMutex.Lock()
return d.grpcMutex.Unlock
return d.grpcMutex.Unlock, nil
}
// nodePluginImplementation is a thin wrapper around the helper instance.
@ -597,7 +682,11 @@ func (d *nodePluginImplementation) NodePrepareResources(ctx context.Context, req
return nil, fmt.Errorf("get resource claims: %w", err)
}
defer d.serializeGRPCIfEnabled()()
unlock, err := d.serializeGRPCIfEnabled()
if err != nil {
return nil, fmt.Errorf("serialize gRPC: %w", err)
}
defer unlock()
result, err := d.plugin.PrepareResourceClaims(ctx, claims)
if err != nil {
@ -659,7 +748,11 @@ func (d *nodePluginImplementation) getResourceClaims(ctx context.Context, claims
// NodeUnprepareResources implements [draapi.NodeUnprepareResources].
func (d *nodePluginImplementation) NodeUnprepareResources(ctx context.Context, req *drapb.NodeUnprepareResourcesRequest) (*drapb.NodeUnprepareResourcesResponse, error) {
defer d.serializeGRPCIfEnabled()
unlock, err := d.serializeGRPCIfEnabled()
if err != nil {
return nil, fmt.Errorf("serialize gRPC: %w", err)
}
defer unlock()
claims := make([]NamespacedObject, 0, len(req.Claims))
for _, claim := range req.Claims {

View File

@ -66,6 +66,7 @@ import (
e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper"
"k8s.io/kubernetes/test/e2e/storage/drivers/proxy"
"k8s.io/kubernetes/test/e2e/storage/utils"
"k8s.io/utils/ptr"
"sigs.k8s.io/yaml"
)
@ -76,6 +77,7 @@ const (
type Nodes struct {
NodeNames []string
tempDir string
}
type Resources struct {
@ -111,6 +113,8 @@ func NewNodesNow(ctx context.Context, f *framework.Framework, minNodes, maxNodes
}
func (nodes *Nodes) init(ctx context.Context, f *framework.Framework, minNodes, maxNodes int) {
nodes.tempDir = ginkgo.GinkgoT().TempDir()
ginkgo.By("selecting nodes")
// The kubelet plugin is harder. We deploy the builtin manifest
// after patching in the driver name and all nodes on which we
@ -214,11 +218,16 @@ func (d *Driver) Run(nodes *Nodes, configureResources func() Resources, devicesP
// not run on all nodes.
resources.Nodes = nodes.NodeNames
}
ginkgo.DeferCleanup(d.IsGone) // Register first so it gets called last.
d.SetUp(nodes, resources, devicesPerNode...)
ginkgo.DeferCleanup(d.TearDown)
}
// NewGetSlices generates a function for gomega.Eventually/Consistently which
// returns the ResourceSliceList.
func (d *Driver) NewGetSlices() framework.GetFunc[*resourceapi.ResourceSliceList] {
return framework.ListObjects(d.f.ClientSet.ResourceV1beta1().ResourceSlices().List, metav1.ListOptions{FieldSelector: resourceapi.ResourceSliceSelectorDriver + "=" + d.Name})
}
type MethodInstance struct {
Nodename string
FullMethod string
@ -227,12 +236,26 @@ type MethodInstance struct {
type Driver struct {
f *framework.Framework
ctx context.Context
cleanup []func() // executed first-in-first-out
cleanup []func(context.Context) // executed first-in-first-out
wg sync.WaitGroup
serviceAccountName string
// NameSuffix can be set while registering a test to deploy different
// drivers in the same test namespace.
NameSuffix string
Name string
// InstanceSuffix can be set while registering a test to deploy two different
// instances of the same driver. Used to generate unique objects in the API server.
// The socket path is still the same.
InstanceSuffix string
// RollingUpdate can be set to true to enable using different socket names
// for different pods and thus seamless upgrades. Must be supported by the kubelet!
RollingUpdate bool
// Name gets derived automatically from the current test namespace and
// (if set) the NameSuffix while setting up the driver for a test.
Name string
// Nodes contains entries for each node selected for a test when the test runs.
// In addition, there is one entry for a fictional node.
@ -263,9 +286,13 @@ func (d *Driver) SetUp(nodes *Nodes, resources Resources, devicesPerNode ...map[
ctx, cancel := context.WithCancel(context.Background())
logger := klog.FromContext(ctx)
logger = klog.LoggerWithValues(logger, "driverName", d.Name)
if d.InstanceSuffix != "" {
instance, _ := strings.CutPrefix(d.InstanceSuffix, "-")
logger = klog.LoggerWithValues(logger, "instance", instance)
}
ctx = klog.NewContext(ctx, logger)
d.ctx = ctx
d.cleanup = append(d.cleanup, cancel)
d.cleanup = append(d.cleanup, func(context.Context) { cancel() })
if !resources.NodeLocal {
// Publish one resource pool with "network-attached" devices.
@ -323,28 +350,31 @@ func (d *Driver) SetUp(nodes *Nodes, resources Resources, devicesPerNode ...map[
}
// Create service account and corresponding RBAC rules.
d.serviceAccountName = "dra-kubelet-plugin-" + d.Name + "-service-account"
d.serviceAccountName = "dra-kubelet-plugin-" + d.Name + d.InstanceSuffix + "-service-account"
content := pluginPermissions
content = strings.ReplaceAll(content, "dra-kubelet-plugin-namespace", d.f.Namespace.Name)
content = strings.ReplaceAll(content, "dra-kubelet-plugin", "dra-kubelet-plugin-"+d.Name)
content = strings.ReplaceAll(content, "dra-kubelet-plugin", "dra-kubelet-plugin-"+d.Name+d.InstanceSuffix)
d.createFromYAML(ctx, []byte(content), d.f.Namespace.Name)
// Using a ReplicaSet instead of a DaemonSet has the advantage that we can control
// the lifecycle explicitly, in particular run two pods per node long enough to
// run checks.
instanceKey := "app.kubernetes.io/instance"
rsName := ""
numNodes := int32(len(nodes.NodeNames))
pluginDataDirectoryPath := path.Join(framework.TestContext.KubeletRootDir, "plugins", d.Name)
registrarDirectoryPath := path.Join(framework.TestContext.KubeletRootDir, "plugins_registry")
registrarSocketFilename := d.Name + "-reg.sock"
instanceName := d.Name + d.InstanceSuffix
err := utils.CreateFromManifests(ctx, d.f, d.f.Namespace, func(item interface{}) error {
switch item := item.(type) {
case *appsv1.ReplicaSet:
item.Name += d.NameSuffix
item.Name += d.NameSuffix + d.InstanceSuffix
rsName = item.Name
item.Spec.Replicas = &numNodes
item.Spec.Selector.MatchLabels[instanceKey] = d.Name
item.Spec.Template.Labels[instanceKey] = d.Name
item.Spec.Selector.MatchLabels[instanceKey] = instanceName
item.Spec.Template.Labels[instanceKey] = instanceName
item.Spec.Template.Spec.ServiceAccountName = d.serviceAccountName
item.Spec.Template.Spec.Affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution[0].LabelSelector.MatchLabels[instanceKey] = d.Name
item.Spec.Template.Spec.Affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution[0].LabelSelector.MatchLabels[instanceKey] = instanceName
item.Spec.Template.Spec.Affinity.NodeAffinity = &v1.NodeAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{
NodeSelectorTerms: []v1.NodeSelectorTerm{
@ -376,7 +406,7 @@ func (d *Driver) SetUp(nodes *Nodes, resources Resources, devicesPerNode ...map[
if err := e2ereplicaset.WaitForReplicaSetTargetAvailableReplicas(ctx, d.f.ClientSet, rs, numNodes); err != nil {
framework.ExpectNoError(err, "all kubelet plugin proxies running")
}
requirement, err := labels.NewRequirement(instanceKey, selection.Equals, []string{d.Name})
requirement, err := labels.NewRequirement(instanceKey, selection.Equals, []string{instanceName})
framework.ExpectNoError(err, "create label selector requirement")
selector := labels.NewSelector().Add(*requirement)
pods, err := d.f.ClientSet.CoreV1().Pods(d.f.Namespace.Name).List(ctx, metav1.ListOptions{LabelSelector: selector.String()})
@ -429,6 +459,14 @@ func (d *Driver) SetUp(nodes *Nodes, resources Resources, devicesPerNode ...map[
// All listeners running in this pod use a new unique local port number
// by atomically incrementing this variable.
listenerPort := int32(9000)
rollingUpdateUID := pod.UID
serialize := true
if !d.RollingUpdate {
rollingUpdateUID = ""
// A test might have to execute two gRPC calls in parallel, so only
// serialize when we explicitly want to test a rolling update.
serialize = false
}
plugin, err := app.StartPlugin(loggerCtx, "/cdi", d.Name, driverClient, nodename, fileOps,
kubeletplugin.GRPCVerbosity(0),
kubeletplugin.GRPCInterceptor(func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
@ -438,17 +476,31 @@ func (d *Driver) SetUp(nodes *Nodes, resources Resources, devicesPerNode ...map[
return d.streamInterceptor(nodename, srv, ss, info, handler)
}),
kubeletplugin.RollingUpdate(rollingUpdateUID),
kubeletplugin.Serialize(serialize),
kubeletplugin.FlockDirectoryPath(nodes.tempDir),
kubeletplugin.PluginDataDirectoryPath(pluginDataDirectoryPath),
kubeletplugin.PluginListener(listen(d.f, &pod, &listenerPort)),
kubeletplugin.RegistrarDirectoryPath(registrarDirectoryPath),
kubeletplugin.RegistrarSocketFilename(registrarSocketFilename),
kubeletplugin.RegistrarListener(listen(d.f, &pod, &listenerPort)),
)
framework.ExpectNoError(err, "start kubelet plugin for node %s", pod.Spec.NodeName)
d.cleanup = append(d.cleanup, func() {
d.cleanup = append(d.cleanup, func(ctx context.Context) {
// Depends on cancel being called first.
plugin.Stop()
// Also explicitly stop all pods.
ginkgo.By("scaling down driver proxy pods for " + d.Name)
rs, err := d.f.ClientSet.AppsV1().ReplicaSets(d.f.Namespace.Name).Get(ctx, rsName, metav1.GetOptions{})
framework.ExpectNoError(err, "get ReplicaSet for driver "+d.Name)
rs.Spec.Replicas = ptr.To(int32(0))
rs, err = d.f.ClientSet.AppsV1().ReplicaSets(d.f.Namespace.Name).Update(ctx, rs, metav1.UpdateOptions{})
framework.ExpectNoError(err, "scale down ReplicaSet for driver "+d.Name)
if err := e2ereplicaset.WaitForReplicaSetTargetAvailableReplicas(ctx, d.f.ClientSet, rs, 0); err != nil {
framework.ExpectNoError(err, "all kubelet plugin proxies stopped")
}
})
d.Nodes[nodename] = KubeletPlugin{ExamplePlugin: plugin, ClientSet: driverClient}
}
@ -717,14 +769,19 @@ func pipe(ctx context.Context, msg string, verbosity int) *io.PipeWriter {
return writer
}
func (d *Driver) TearDown() {
func (d *Driver) TearDown(ctx context.Context) {
for _, c := range d.cleanup {
c()
c(ctx)
}
d.cleanup = nil
d.wg.Wait()
}
// IsGone checks that the kubelet is done with the driver.
// This is done by waiting for the kubelet to remove the
// driver's ResourceSlices, which takes at least 5 minutes
// because of the delay in the kubelet. Only use this in slow
// tests...
func (d *Driver) IsGone(ctx context.Context) {
gomega.Eventually(ctx, func(ctx context.Context) ([]resourceapi.ResourceSlice, error) {
slices, err := d.f.ClientSet.ResourceV1beta1().ResourceSlices().List(ctx, metav1.ListOptions{FieldSelector: resourceapi.ResourceSliceSelectorDriver + "=" + d.Name})
@ -732,7 +789,7 @@ func (d *Driver) IsGone(ctx context.Context) {
return nil, err
}
return slices.Items, err
}).Should(gomega.BeEmpty())
}).WithTimeout(7 * time.Minute).Should(gomega.BeEmpty())
}
func (d *Driver) interceptor(nodename string, ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {

View File

@ -1801,6 +1801,150 @@ var _ = framework.SIGDescribe("node")("DRA", feature.DynamicResourceAllocation,
framework.ExpectNoError(f.ClientSet.CoreV1().Pods(pod.Namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{}))
framework.ExpectNoError(e2epod.WaitForPodNotFoundInNamespace(ctx, f.ClientSet, pod.Name, pod.Namespace, f.Timeouts.PodDelete))
})
ginkgo.It("rolling update", func(ctx context.Context) {
nodes := NewNodesNow(ctx, f, 1, 1)
oldDriver := NewDriverInstance(f)
oldDriver.InstanceSuffix = "-old"
oldDriver.RollingUpdate = true
oldDriver.Run(nodes, perNode(1, nodes))
// We expect one ResourceSlice per node from the driver.
getSlices := oldDriver.NewGetSlices()
gomega.Eventually(ctx, getSlices).Should(gomega.HaveField("Items", gomega.HaveLen(len(nodes.NodeNames))))
initialSlices, err := getSlices(ctx)
framework.ExpectNoError(err)
// Same driver name, different socket paths because of rolling update.
newDriver := NewDriverInstance(f)
newDriver.InstanceSuffix = "-new"
newDriver.RollingUpdate = true
newDriver.Run(nodes, perNode(1, nodes))
// Stop old driver instance.
oldDriver.TearDown(ctx)
// Build behaves the same for both driver instances.
b := newBuilderNow(ctx, f, oldDriver)
claim := b.externalClaim()
pod := b.podExternal()
b.create(ctx, claim, pod)
b.testPod(ctx, f, pod)
// The exact same slices should still exist.
finalSlices, err := getSlices(ctx)
framework.ExpectNoError(err)
gomega.Expect(finalSlices.Items).Should(gomega.Equal(initialSlices.Items))
// We need to clean up explicitly because the normal
// cleanup doesn't work (driver shuts down first).
// framework.ExpectNoError(f.ClientSet.ResourceV1beta1().ResourceClaims(claim.Namespace).Delete(ctx, claim.Name, metav1.DeleteOptions{}))
framework.ExpectNoError(f.ClientSet.CoreV1().Pods(pod.Namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{}))
framework.ExpectNoError(e2epod.WaitForPodNotFoundInNamespace(ctx, f.ClientSet, pod.Name, pod.Namespace, f.Timeouts.PodDelete))
})
ginkgo.It("failed update", func(ctx context.Context) {
nodes := NewNodesNow(ctx, f, 1, 1)
oldDriver := NewDriverInstance(f)
oldDriver.InstanceSuffix = "-old"
oldDriver.RollingUpdate = true
oldDriver.Run(nodes, perNode(1, nodes))
// We expect one ResourceSlice per node from the driver.
getSlices := oldDriver.NewGetSlices()
gomega.Eventually(ctx, getSlices).Should(gomega.HaveField("Items", gomega.HaveLen(len(nodes.NodeNames))))
initialSlices, err := getSlices(ctx)
framework.ExpectNoError(err)
// Same driver name, different socket paths because of rolling update.
newDriver := NewDriverInstance(f)
newDriver.InstanceSuffix = "-new"
newDriver.RollingUpdate = true
newDriver.Run(nodes, perNode(1, nodes))
// Stop new driver instance, simulating the failure of the new instance.
// The kubelet should still have the old instance.
newDriver.TearDown(ctx)
// Build behaves the same for both driver instances.
b := newBuilderNow(ctx, f, oldDriver)
claim := b.externalClaim()
pod := b.podExternal()
b.create(ctx, claim, pod)
b.testPod(ctx, f, pod)
// The exact same slices should still exist.
finalSlices, err := getSlices(ctx)
framework.ExpectNoError(err)
gomega.Expect(finalSlices.Items).Should(gomega.Equal(initialSlices.Items))
// We need to clean up explicitly because the normal
// cleanup doesn't work (driver shuts down first).
// framework.ExpectNoError(f.ClientSet.ResourceV1beta1().ResourceClaims(claim.Namespace).Delete(ctx, claim.Name, metav1.DeleteOptions{}))
framework.ExpectNoError(f.ClientSet.CoreV1().Pods(pod.Namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{}))
framework.ExpectNoError(e2epod.WaitForPodNotFoundInNamespace(ctx, f.ClientSet, pod.Name, pod.Namespace, f.Timeouts.PodDelete))
})
f.It("sequential update with pods replacing each other", framework.WithSlow(), func(ctx context.Context) {
nodes := NewNodesNow(ctx, f, 1, 1)
// Same driver name, same socket path.
oldDriver := NewDriverInstance(f)
oldDriver.InstanceSuffix = "-old"
oldDriver.Run(nodes, perNode(1, nodes))
// Collect set of resource slices for that driver.
listSlices := framework.ListObjects(f.ClientSet.ResourceV1beta1().ResourceSlices().List, metav1.ListOptions{
FieldSelector: "spec.driver=" + oldDriver.Name,
})
gomega.Eventually(ctx, listSlices).Should(gomega.HaveField("Items", gomega.Not(gomega.BeEmpty())), "driver should have published ResourceSlices, got none")
oldSlices, err := listSlices(ctx)
framework.ExpectNoError(err, "list slices published by old driver")
if len(oldSlices.Items) == 0 {
framework.Fail("driver should have published ResourceSlices, got none")
}
// "Update" the driver by taking it down and bringing up a new one.
// Pods never run in parallel, similar to how a DaemonSet would update
// its pods when maxSurge is zero.
ginkgo.By("reinstall driver")
start := time.Now()
oldDriver.TearDown(ctx)
newDriver := NewDriverInstance(f)
newDriver.InstanceSuffix = "-new"
newDriver.Run(nodes, perNode(1, nodes))
updateDuration := time.Since(start)
// Build behaves the same for both driver instances.
b := newBuilderNow(ctx, f, oldDriver)
claim := b.externalClaim()
pod := b.podExternal()
b.create(ctx, claim, pod)
b.testPod(ctx, f, pod)
// The slices should have survived the update, but only if it happened
// quickly enough. If it took too long, the kubelet considered the driver
// gone and removed them.
if updateDuration <= 3*time.Minute {
newSlices, err := listSlices(ctx)
framework.ExpectNoError(err, "list slices again")
gomega.Expect(newSlices.Items).To(gomega.ConsistOf(oldSlices.Items))
}
// We need to clean up explicitly because the normal
// cleanup doesn't work (driver shuts down first).
// framework.ExpectNoError(f.ClientSet.ResourceV1beta1().ResourceClaims(claim.Namespace).Delete(ctx, claim.Name, metav1.DeleteOptions{}))
framework.ExpectNoError(f.ClientSet.CoreV1().Pods(pod.Namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{}))
framework.ExpectNoError(e2epod.WaitForPodNotFoundInNamespace(ctx, f.ClientSet, pod.Name, pod.Namespace, f.Timeouts.PodDelete))
// Now shut down for good and wait for the kubelet to react.
// This takes time...
ginkgo.By("uninstalling driver and waiting for ResourceSlice wiping")
newDriver.TearDown(ctx)
newDriver.IsGone(ctx)
})
})
// builder contains a running counter to make objects unique within thir

View File

@ -168,7 +168,6 @@ func StartPlugin(ctx context.Context, cdiDir, driverName string, kubeClient kube
kubeletplugin.KubeClient(kubeClient),
kubeletplugin.GRPCInterceptor(ex.recordGRPCCall),
kubeletplugin.GRPCStreamInterceptor(ex.recordGRPCStream),
kubeletplugin.Serialize(false), // The example plugin does its own locking.
)
d, err := kubeletplugin.Start(ctx, ex, opts...)
if err != nil {