DRA kubeletplugin: add RollingUpdate

When the new RollingUpdate option is used, the DRA driver gets deployed such
that it uses unique socket paths and uses file locking to serialize gRPC
calls. This enables the kubelet to pick arbitrarily between two concurrently
instances. The handover is seamless (no downtime, no removal of ResourceSlices
by the kubelet).

For file locking, the fileutils package from etcd is used because that was
already a Kubernetes dependency. Unfortunately that package brings in some
additional indirect dependency for DRA drivers (zap, multierr), but those
seem acceptable.
This commit is contained in:
Patrick Ohly
2025-03-13 12:03:10 +01:00
parent b471c2c11f
commit 582b421393
7 changed files with 235 additions and 23 deletions

View File

@@ -32,6 +32,11 @@ setting `maxSurge` to a value larger than zero enables such a seamless upgrade.
### In a plugin
*Note*: For DRA, the
[k8s.io/dynamic-resource-allocation](https://pkg.go.dev/k8s.io/dynamic-resource-allocation/kubeletplugin)
helper package offers the `RollingUpdate` option which implements the socket
handling as described in this section.
To support seamless upgrades, each plugin instance must use a unique
socket filename. Otherwise the following could happen:
- The old instance is registered with `plugin.example.com-reg.sock`.

View File

@@ -12,6 +12,7 @@ require (
github.com/google/go-cmp v0.7.0
github.com/onsi/gomega v1.35.1
github.com/stretchr/testify v1.10.0
go.etcd.io/etcd/client/pkg/v3 v3.5.16
google.golang.org/grpc v1.68.1
k8s.io/api v0.0.0
k8s.io/apimachinery v0.0.0
@@ -57,6 +58,8 @@ require (
github.com/x448/float16 v0.8.4 // indirect
go.opentelemetry.io/otel v1.33.0 // indirect
go.opentelemetry.io/otel/trace v1.33.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.27.0 // indirect
golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 // indirect
golang.org/x/net v0.33.0 // indirect
golang.org/x/oauth2 v0.27.0 // indirect

View File

@@ -158,6 +158,7 @@ github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
go.etcd.io/bbolt v1.3.11/go.mod h1:dksAq7YMXoljX0xu6VF5DMZGbhYYoLUalEiSySYAS4I=
go.etcd.io/etcd/api/v3 v3.5.16/go.mod h1:1P4SlIP/VwkDmGo3OlOD7faPeP8KDIFhqvciH5EfN28=
go.etcd.io/etcd/client/pkg/v3 v3.5.16 h1:ZgY48uH6UvB+/7R9Yf4x574uCO3jIx0TRDyetSfId3Q=
go.etcd.io/etcd/client/pkg/v3 v3.5.16/go.mod h1:V8acl8pcEK0Y2g19YlOV9m9ssUe6MgiDSobSoaBAM0E=
go.etcd.io/etcd/client/v2 v2.305.16/go.mod h1:h9YxWCzcdvZENbfzBTFCnoNumr2ax3F19sKMqHFmXHE=
go.etcd.io/etcd/client/v3 v3.5.16/go.mod h1:X+rExSGkyqxvu276cr2OwPLBaeqFu1cIl4vmRjAD/50=
@@ -176,9 +177,12 @@ go.opentelemetry.io/otel/sdk v1.33.0/go.mod h1:A1Q5oi7/9XaMlIWzPSxLRWOI8nG3FnzHJ
go.opentelemetry.io/otel/trace v1.33.0 h1:cCJuF7LRjUFso9LPnEAHJDB2pqzp+hbO8eu1qqW2d/s=
go.opentelemetry.io/otel/trace v1.33.0/go.mod h1:uIcdVUZMpTAmz0tI1z04GoVSezK37CbGV4fr1f2nBck=
go.opentelemetry.io/proto/otlp v1.4.0/go.mod h1:PPBWZIP98o2ElSqI35IHfu7hIhSwvc5N38Jw8pXuGFY=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8=
go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=

View File

@@ -21,12 +21,14 @@ import (
"errors"
"fmt"
"net"
"os"
"path"
"sync"
"google.golang.org/grpc"
"k8s.io/klog/v2"
"go.etcd.io/etcd/client/pkg/v3/fileutil"
resourceapi "k8s.io/api/resource/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
@@ -195,8 +197,10 @@ func RegistrarDirectoryPath(path string) Option {
// support updates from an installation which used an older release of
// of the helper code.
//
// The default is <driver name>-reg.sock. When rolling updates are enabled (not supported yet),
// The default is <driver name>-reg.sock. When rolling updates are enabled,
// it is <driver name>-<uid>-reg.sock.
//
// This option and [RollingUpdate] are mutually exclusive.
func RegistrarSocketFilename(name string) Option {
return func(o *options) error {
o.pluginRegistrationEndpoint.file = name
@@ -248,6 +252,44 @@ func PluginListener(listen func(ctx context.Context, path string) (net.Listener,
}
}
// RollingUpdate can be used to enable support for running two plugin instances
// in parallel while a newer instance replaces the older. When enabled, both
// instances must share the same plugin data directory and driver name.
// They create different sockets to allow the kubelet to connect to both at
// the same time.
//
// There is no guarantee which of the two instances are used by kubelet.
// For example, it can happen that a claim gets prepared by one instance
// and then needs to be unprepared by the other. Kubelet then may fall back
// to the first one again for some other operation. In practice this means
// that each instance must be entirely stateless across method calls.
// Serialization (on by default, see [Serialize]) ensures that methods
// are serialized across all instances through file locking. The plugin
// implementation can load shared state from a file at the start
// of a call, execute and then store the updated shared state again.
//
// Passing a non-empty uid enables rolling updates, an empty uid disables it.
// The uid must be the pod UID. A DaemonSet can pass that into the driver container
// via the downward API (https://kubernetes.io/docs/concepts/workloads/pods/downward-api/#downwardapi-fieldRef).
//
// Because new instances cannot remove stale sockets of older instances,
// it is important that each pod shuts down cleanly: it must catch SIGINT/TERM
// and stop the helper instead of quitting immediately.
//
// This depends on support in the kubelet which was added in Kubernetes 1.33.
// Don't use this if it is not certain that the kubelet has that support!
//
// This option and [RegistrarSocketFilename] are mutually exclusive.
func RollingUpdate(uid types.UID) Option {
return func(o *options) error {
o.rollingUpdateUID = uid
// TODO: ask the kubelet whether that pod is still running and
// clean up leftover sockets?
return nil
}
}
// GRPCInterceptor is called for each incoming gRPC method call. This option
// may be used more than once and each interceptor will get called.
func GRPCInterceptor(interceptor grpc.UnaryServerInterceptor) Option {
@@ -322,6 +364,17 @@ func Serialize(enabled bool) Option {
}
}
// FlockDir changes where lock files are created and locked. A lock file
// is needed when serializing gRPC calls and rolling updates are enabled.
// The directory must exist and be reserved for exclusive use by the
// driver. The default is the plugin data directory.
func FlockDirectoryPath(path string) Option {
return func(o *options) error {
o.flockDirectoryPath = path
return nil
}
}
type options struct {
logger klog.Logger
grpcVerbosity int
@@ -330,11 +383,13 @@ type options struct {
nodeUID types.UID
pluginRegistrationEndpoint endpoint
pluginDataDirectoryPath string
rollingUpdateUID types.UID
draEndpointListen func(ctx context.Context, path string) (net.Listener, error)
unaryInterceptors []grpc.UnaryServerInterceptor
streamInterceptors []grpc.StreamServerInterceptor
kubeClient kubernetes.Interface
serialize bool
flockDirectoryPath string
nodeV1beta1 bool
}
@@ -344,17 +399,18 @@ type Helper struct {
// backgroundCtx is for activities that are started later.
backgroundCtx context.Context
// cancel cancels the backgroundCtx.
cancel func(cause error)
wg sync.WaitGroup
registrar *nodeRegistrar
pluginServer *grpcServer
plugin DRAPlugin
driverName string
nodeName string
nodeUID types.UID
kubeClient kubernetes.Interface
serialize bool
grpcMutex sync.Mutex
cancel func(cause error)
wg sync.WaitGroup
registrar *nodeRegistrar
pluginServer *grpcServer
plugin DRAPlugin
driverName string
nodeName string
nodeUID types.UID
kubeClient kubernetes.Interface
serialize bool
grpcMutex sync.Mutex
grpcLockFilePath string
// Information about resource publishing changes concurrently and thus
// must be protected by the mutex. The controller gets started only
@@ -392,12 +448,20 @@ func Start(ctx context.Context, plugin DRAPlugin, opts ...Option) (result *Helpe
if o.driverName == "" {
return nil, errors.New("driver name must be set")
}
if o.rollingUpdateUID != "" && o.pluginRegistrationEndpoint.file != "" {
return nil, errors.New("rolling updates and explicit registration socket filename are mutually exclusive")
}
uidPart := ""
if o.rollingUpdateUID != "" {
uidPart = "-" + string(o.rollingUpdateUID)
}
if o.pluginRegistrationEndpoint.file == "" {
o.pluginRegistrationEndpoint.file = o.driverName + "-reg.sock"
o.pluginRegistrationEndpoint.file = o.driverName + uidPart + "-reg.sock"
}
if o.pluginDataDirectoryPath == "" {
o.pluginDataDirectoryPath = path.Join(KubeletPluginsDir, o.driverName)
}
d := &Helper{
driverName: o.driverName,
nodeName: o.nodeName,
@@ -406,6 +470,14 @@ func Start(ctx context.Context, plugin DRAPlugin, opts ...Option) (result *Helpe
serialize: o.serialize,
plugin: plugin,
}
if o.rollingUpdateUID != "" {
dir := o.pluginDataDirectoryPath
if o.flockDirectoryPath != "" {
dir = o.flockDirectoryPath
}
// Enable file locking, required for concurrently running pods.
d.grpcLockFilePath = path.Join(dir, "serialize.lock")
}
// Stop calls cancel and therefore both cancellation
// and Stop cause goroutines to stop.
@@ -434,7 +506,7 @@ func Start(ctx context.Context, plugin DRAPlugin, opts ...Option) (result *Helpe
var supportedServices []string
draEndpoint := endpoint{
dir: o.pluginDataDirectoryPath,
file: "dra.sock", // "dra" is hard-coded.
file: "dra" + uidPart + ".sock", // "dra" is hard-coded. The directory is unique, so we get a unique full path also without the UID.
listenFunc: o.draEndpointListen,
}
pluginServer, err := startGRPCServer(klog.LoggerWithName(logger, "dra"), o.grpcVerbosity, o.unaryInterceptors, o.streamInterceptors, draEndpoint, func(grpcServer *grpc.Server) {
@@ -575,12 +647,25 @@ func (d *Helper) RegistrationStatus() *registerapi.RegistrationStatus {
// serializeGRPCIfEnabled locks a mutex if serialization is enabled.
// Either way it returns a method that the caller must invoke
// via defer.
func (d *Helper) serializeGRPCIfEnabled() func() {
func (d *Helper) serializeGRPCIfEnabled() (func(), error) {
if !d.serialize {
return func() {}
return func() {}, nil
}
// If rolling updates are enabled, we cannot do only in-memory locking.
// We must use file locking.
if d.grpcLockFilePath != "" {
file, err := fileutil.LockFile(d.grpcLockFilePath, os.O_RDWR|os.O_CREATE, 0666)
if err != nil {
return nil, fmt.Errorf("lock file: %w", err)
}
return func() {
_ = file.Close()
}, nil
}
d.grpcMutex.Lock()
return d.grpcMutex.Unlock
return d.grpcMutex.Unlock, nil
}
// nodePluginImplementation is a thin wrapper around the helper instance.
@@ -597,7 +682,11 @@ func (d *nodePluginImplementation) NodePrepareResources(ctx context.Context, req
return nil, fmt.Errorf("get resource claims: %w", err)
}
defer d.serializeGRPCIfEnabled()()
unlock, err := d.serializeGRPCIfEnabled()
if err != nil {
return nil, fmt.Errorf("serialize gRPC: %w", err)
}
defer unlock()
result, err := d.plugin.PrepareResourceClaims(ctx, claims)
if err != nil {
@@ -659,7 +748,11 @@ func (d *nodePluginImplementation) getResourceClaims(ctx context.Context, claims
// NodeUnprepareResources implements [draapi.NodeUnprepareResources].
func (d *nodePluginImplementation) NodeUnprepareResources(ctx context.Context, req *drapb.NodeUnprepareResourcesRequest) (*drapb.NodeUnprepareResourcesResponse, error) {
defer d.serializeGRPCIfEnabled()
unlock, err := d.serializeGRPCIfEnabled()
if err != nil {
return nil, fmt.Errorf("serialize gRPC: %w", err)
}
defer unlock()
claims := make([]NamespacedObject, 0, len(req.Claims))
for _, claim := range req.Claims {

View File

@@ -77,6 +77,7 @@ const (
type Nodes struct {
NodeNames []string
tempDir string
}
type Resources struct {
@@ -112,6 +113,8 @@ func NewNodesNow(ctx context.Context, f *framework.Framework, minNodes, maxNodes
}
func (nodes *Nodes) init(ctx context.Context, f *framework.Framework, minNodes, maxNodes int) {
nodes.tempDir = ginkgo.GinkgoT().TempDir()
ginkgo.By("selecting nodes")
// The kubelet plugin is harder. We deploy the builtin manifest
// after patching in the driver name and all nodes on which we
@@ -219,6 +222,12 @@ func (d *Driver) Run(nodes *Nodes, configureResources func() Resources, devicesP
ginkgo.DeferCleanup(d.TearDown)
}
// NewGetSlices generates a function for gomega.Eventually/Consistently which
// returns the ResourceSliceList.
func (d *Driver) NewGetSlices() framework.GetFunc[*resourceapi.ResourceSliceList] {
return framework.ListObjects(d.f.ClientSet.ResourceV1beta1().ResourceSlices().List, metav1.ListOptions{FieldSelector: resourceapi.ResourceSliceSelectorDriver + "=" + d.Name})
}
type MethodInstance struct {
Nodename string
FullMethod string
@@ -240,6 +249,10 @@ type Driver struct {
// The socket path is still the same.
InstanceSuffix string
// RollingUpdate can be set to true to enable using different socket names
// for different pods and thus seamless upgrades. Must be supported by the kubelet!
RollingUpdate bool
// Name gets derived automatically from the current test namespace and
// (if set) the NameSuffix while setting up the driver for a test.
Name string
@@ -351,7 +364,6 @@ func (d *Driver) SetUp(nodes *Nodes, resources Resources, devicesPerNode ...map[
numNodes := int32(len(nodes.NodeNames))
pluginDataDirectoryPath := path.Join(framework.TestContext.KubeletRootDir, "plugins", d.Name)
registrarDirectoryPath := path.Join(framework.TestContext.KubeletRootDir, "plugins_registry")
registrarSocketFilename := d.Name + "-reg.sock"
instanceName := d.Name + d.InstanceSuffix
err := utils.CreateFromManifests(ctx, d.f, d.f.Namespace, func(item interface{}) error {
switch item := item.(type) {
@@ -447,6 +459,14 @@ func (d *Driver) SetUp(nodes *Nodes, resources Resources, devicesPerNode ...map[
// All listeners running in this pod use a new unique local port number
// by atomically incrementing this variable.
listenerPort := int32(9000)
rollingUpdateUID := pod.UID
serialize := true
if !d.RollingUpdate {
rollingUpdateUID = ""
// A test might have to execute two gRPC calls in parallel, so only
// serialize when we explicitly want to test a rolling update.
serialize = false
}
plugin, err := app.StartPlugin(loggerCtx, "/cdi", d.Name, driverClient, nodename, fileOps,
kubeletplugin.GRPCVerbosity(0),
kubeletplugin.GRPCInterceptor(func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
@@ -456,11 +476,14 @@ func (d *Driver) SetUp(nodes *Nodes, resources Resources, devicesPerNode ...map[
return d.streamInterceptor(nodename, srv, ss, info, handler)
}),
kubeletplugin.RollingUpdate(rollingUpdateUID),
kubeletplugin.Serialize(serialize),
kubeletplugin.FlockDirectoryPath(nodes.tempDir),
kubeletplugin.PluginDataDirectoryPath(pluginDataDirectoryPath),
kubeletplugin.PluginListener(listen(d.f, &pod, &listenerPort)),
kubeletplugin.RegistrarDirectoryPath(registrarDirectoryPath),
kubeletplugin.RegistrarSocketFilename(registrarSocketFilename),
kubeletplugin.RegistrarListener(listen(d.f, &pod, &listenerPort)),
)
framework.ExpectNoError(err, "start kubelet plugin for node %s", pod.Spec.NodeName)

View File

@@ -1802,6 +1802,91 @@ var _ = framework.SIGDescribe("node")("DRA", feature.DynamicResourceAllocation,
framework.ExpectNoError(e2epod.WaitForPodNotFoundInNamespace(ctx, f.ClientSet, pod.Name, pod.Namespace, f.Timeouts.PodDelete))
})
ginkgo.It("rolling update", func(ctx context.Context) {
nodes := NewNodesNow(ctx, f, 1, 1)
oldDriver := NewDriverInstance(f)
oldDriver.InstanceSuffix = "-old"
oldDriver.RollingUpdate = true
oldDriver.Run(nodes, perNode(1, nodes))
// We expect one ResourceSlice per node from the driver.
getSlices := oldDriver.NewGetSlices()
gomega.Eventually(ctx, getSlices).Should(gomega.HaveField("Items", gomega.HaveLen(len(nodes.NodeNames))))
initialSlices, err := getSlices(ctx)
framework.ExpectNoError(err)
// Same driver name, different socket paths because of rolling update.
newDriver := NewDriverInstance(f)
newDriver.InstanceSuffix = "-new"
newDriver.RollingUpdate = true
newDriver.Run(nodes, perNode(1, nodes))
// Stop old driver instance.
oldDriver.TearDown(ctx)
// Build behaves the same for both driver instances.
b := newBuilderNow(ctx, f, oldDriver)
claim := b.externalClaim()
pod := b.podExternal()
b.create(ctx, claim, pod)
b.testPod(ctx, f, pod)
// The exact same slices should still exist.
finalSlices, err := getSlices(ctx)
framework.ExpectNoError(err)
gomega.Expect(finalSlices.Items).Should(gomega.Equal(initialSlices.Items))
// We need to clean up explicitly because the normal
// cleanup doesn't work (driver shuts down first).
// framework.ExpectNoError(f.ClientSet.ResourceV1beta1().ResourceClaims(claim.Namespace).Delete(ctx, claim.Name, metav1.DeleteOptions{}))
framework.ExpectNoError(f.ClientSet.CoreV1().Pods(pod.Namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{}))
framework.ExpectNoError(e2epod.WaitForPodNotFoundInNamespace(ctx, f.ClientSet, pod.Name, pod.Namespace, f.Timeouts.PodDelete))
})
ginkgo.It("failed update", func(ctx context.Context) {
nodes := NewNodesNow(ctx, f, 1, 1)
oldDriver := NewDriverInstance(f)
oldDriver.InstanceSuffix = "-old"
oldDriver.RollingUpdate = true
oldDriver.Run(nodes, perNode(1, nodes))
// We expect one ResourceSlice per node from the driver.
getSlices := oldDriver.NewGetSlices()
gomega.Eventually(ctx, getSlices).Should(gomega.HaveField("Items", gomega.HaveLen(len(nodes.NodeNames))))
initialSlices, err := getSlices(ctx)
framework.ExpectNoError(err)
// Same driver name, different socket paths because of rolling update.
newDriver := NewDriverInstance(f)
newDriver.InstanceSuffix = "-new"
newDriver.RollingUpdate = true
newDriver.Run(nodes, perNode(1, nodes))
// Stop new driver instance, simulating the failure of the new instance.
// The kubelet should still have the old instance.
newDriver.TearDown(ctx)
// Build behaves the same for both driver instances.
b := newBuilderNow(ctx, f, oldDriver)
claim := b.externalClaim()
pod := b.podExternal()
b.create(ctx, claim, pod)
b.testPod(ctx, f, pod)
// The exact same slices should still exist.
finalSlices, err := getSlices(ctx)
framework.ExpectNoError(err)
gomega.Expect(finalSlices.Items).Should(gomega.Equal(initialSlices.Items))
// We need to clean up explicitly because the normal
// cleanup doesn't work (driver shuts down first).
// framework.ExpectNoError(f.ClientSet.ResourceV1beta1().ResourceClaims(claim.Namespace).Delete(ctx, claim.Name, metav1.DeleteOptions{}))
framework.ExpectNoError(f.ClientSet.CoreV1().Pods(pod.Namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{}))
framework.ExpectNoError(e2epod.WaitForPodNotFoundInNamespace(ctx, f.ClientSet, pod.Name, pod.Namespace, f.Timeouts.PodDelete))
})
f.It("sequential update with pods replacing each other", framework.WithSlow(), func(ctx context.Context) {
nodes := NewNodesNow(ctx, f, 1, 1)
@@ -1823,7 +1908,7 @@ var _ = framework.SIGDescribe("node")("DRA", feature.DynamicResourceAllocation,
// "Update" the driver by taking it down and bringing up a new one.
// Pods never run in parallel, similar to how a DaemonSet would update
// its pods.
// its pods when maxSurge is zero.
ginkgo.By("reinstall driver")
start := time.Now()
oldDriver.TearDown(ctx)

View File

@@ -168,7 +168,6 @@ func StartPlugin(ctx context.Context, cdiDir, driverName string, kubeClient kube
kubeletplugin.KubeClient(kubeClient),
kubeletplugin.GRPCInterceptor(ex.recordGRPCCall),
kubeletplugin.GRPCStreamInterceptor(ex.recordGRPCStream),
kubeletplugin.Serialize(false), // The example plugin does its own locking.
)
d, err := kubeletplugin.Start(ctx, ex, opts...)
if err != nil {