diff --git a/pkg/kubelet/pluginmanager/pluginwatcher/README.md b/pkg/kubelet/pluginmanager/pluginwatcher/README.md index 87280fa93c7..9403829a2fb 100644 --- a/pkg/kubelet/pluginmanager/pluginwatcher/README.md +++ b/pkg/kubelet/pluginmanager/pluginwatcher/README.md @@ -32,6 +32,11 @@ setting `maxSurge` to a value larger than zero enables such a seamless upgrade. ### In a plugin +*Note*: For DRA, the +[k8s.io/dynamic-resource-allocation](https://pkg.go.dev/k8s.io/dynamic-resource-allocation/kubeletplugin) +helper package offers the `RollingUpdate` option which implements the socket +handling as described in this section. + To support seamless upgrades, each plugin instance must use a unique socket filename. Otherwise the following could happen: - The old instance is registered with `plugin.example.com-reg.sock`. diff --git a/staging/src/k8s.io/dynamic-resource-allocation/go.mod b/staging/src/k8s.io/dynamic-resource-allocation/go.mod index b152fb0bbfb..451a9332d4a 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/go.mod +++ b/staging/src/k8s.io/dynamic-resource-allocation/go.mod @@ -12,6 +12,7 @@ require ( github.com/google/go-cmp v0.7.0 github.com/onsi/gomega v1.35.1 github.com/stretchr/testify v1.10.0 + go.etcd.io/etcd/client/pkg/v3 v3.5.16 google.golang.org/grpc v1.68.1 k8s.io/api v0.0.0 k8s.io/apimachinery v0.0.0 @@ -57,6 +58,8 @@ require ( github.com/x448/float16 v0.8.4 // indirect go.opentelemetry.io/otel v1.33.0 // indirect go.opentelemetry.io/otel/trace v1.33.0 // indirect + go.uber.org/multierr v1.11.0 // indirect + go.uber.org/zap v1.27.0 // indirect golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 // indirect golang.org/x/net v0.33.0 // indirect golang.org/x/oauth2 v0.27.0 // indirect diff --git a/staging/src/k8s.io/dynamic-resource-allocation/go.sum b/staging/src/k8s.io/dynamic-resource-allocation/go.sum index 991e9276662..aae134cac79 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/go.sum +++ b/staging/src/k8s.io/dynamic-resource-allocation/go.sum @@ -158,6 +158,7 @@ github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= go.etcd.io/bbolt v1.3.11/go.mod h1:dksAq7YMXoljX0xu6VF5DMZGbhYYoLUalEiSySYAS4I= go.etcd.io/etcd/api/v3 v3.5.16/go.mod h1:1P4SlIP/VwkDmGo3OlOD7faPeP8KDIFhqvciH5EfN28= +go.etcd.io/etcd/client/pkg/v3 v3.5.16 h1:ZgY48uH6UvB+/7R9Yf4x574uCO3jIx0TRDyetSfId3Q= go.etcd.io/etcd/client/pkg/v3 v3.5.16/go.mod h1:V8acl8pcEK0Y2g19YlOV9m9ssUe6MgiDSobSoaBAM0E= go.etcd.io/etcd/client/v2 v2.305.16/go.mod h1:h9YxWCzcdvZENbfzBTFCnoNumr2ax3F19sKMqHFmXHE= go.etcd.io/etcd/client/v3 v3.5.16/go.mod h1:X+rExSGkyqxvu276cr2OwPLBaeqFu1cIl4vmRjAD/50= @@ -176,9 +177,12 @@ go.opentelemetry.io/otel/sdk v1.33.0/go.mod h1:A1Q5oi7/9XaMlIWzPSxLRWOI8nG3FnzHJ go.opentelemetry.io/otel/trace v1.33.0 h1:cCJuF7LRjUFso9LPnEAHJDB2pqzp+hbO8eu1qqW2d/s= go.opentelemetry.io/otel/trace v1.33.0/go.mod h1:uIcdVUZMpTAmz0tI1z04GoVSezK37CbGV4fr1f2nBck= go.opentelemetry.io/proto/otlp v1.4.0/go.mod h1:PPBWZIP98o2ElSqI35IHfu7hIhSwvc5N38Jw8pXuGFY= +go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= +go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= +go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= diff --git a/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/draplugin.go b/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/draplugin.go index 57602cdaee0..92f1af253e1 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/draplugin.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/draplugin.go @@ -21,12 +21,14 @@ import ( "errors" "fmt" "net" + "os" "path" "sync" "google.golang.org/grpc" "k8s.io/klog/v2" + "go.etcd.io/etcd/client/pkg/v3/fileutil" resourceapi "k8s.io/api/resource/v1beta1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" @@ -195,8 +197,10 @@ func RegistrarDirectoryPath(path string) Option { // support updates from an installation which used an older release of // of the helper code. // -// The default is -reg.sock. When rolling updates are enabled (not supported yet), +// The default is -reg.sock. When rolling updates are enabled, // it is --reg.sock. +// +// This option and [RollingUpdate] are mutually exclusive. func RegistrarSocketFilename(name string) Option { return func(o *options) error { o.pluginRegistrationEndpoint.file = name @@ -248,6 +252,44 @@ func PluginListener(listen func(ctx context.Context, path string) (net.Listener, } } +// RollingUpdate can be used to enable support for running two plugin instances +// in parallel while a newer instance replaces the older. When enabled, both +// instances must share the same plugin data directory and driver name. +// They create different sockets to allow the kubelet to connect to both at +// the same time. +// +// There is no guarantee which of the two instances are used by kubelet. +// For example, it can happen that a claim gets prepared by one instance +// and then needs to be unprepared by the other. Kubelet then may fall back +// to the first one again for some other operation. In practice this means +// that each instance must be entirely stateless across method calls. +// Serialization (on by default, see [Serialize]) ensures that methods +// are serialized across all instances through file locking. The plugin +// implementation can load shared state from a file at the start +// of a call, execute and then store the updated shared state again. +// +// Passing a non-empty uid enables rolling updates, an empty uid disables it. +// The uid must be the pod UID. A DaemonSet can pass that into the driver container +// via the downward API (https://kubernetes.io/docs/concepts/workloads/pods/downward-api/#downwardapi-fieldRef). +// +// Because new instances cannot remove stale sockets of older instances, +// it is important that each pod shuts down cleanly: it must catch SIGINT/TERM +// and stop the helper instead of quitting immediately. +// +// This depends on support in the kubelet which was added in Kubernetes 1.33. +// Don't use this if it is not certain that the kubelet has that support! +// +// This option and [RegistrarSocketFilename] are mutually exclusive. +func RollingUpdate(uid types.UID) Option { + return func(o *options) error { + o.rollingUpdateUID = uid + + // TODO: ask the kubelet whether that pod is still running and + // clean up leftover sockets? + return nil + } +} + // GRPCInterceptor is called for each incoming gRPC method call. This option // may be used more than once and each interceptor will get called. func GRPCInterceptor(interceptor grpc.UnaryServerInterceptor) Option { @@ -322,6 +364,17 @@ func Serialize(enabled bool) Option { } } +// FlockDir changes where lock files are created and locked. A lock file +// is needed when serializing gRPC calls and rolling updates are enabled. +// The directory must exist and be reserved for exclusive use by the +// driver. The default is the plugin data directory. +func FlockDirectoryPath(path string) Option { + return func(o *options) error { + o.flockDirectoryPath = path + return nil + } +} + type options struct { logger klog.Logger grpcVerbosity int @@ -330,11 +383,13 @@ type options struct { nodeUID types.UID pluginRegistrationEndpoint endpoint pluginDataDirectoryPath string + rollingUpdateUID types.UID draEndpointListen func(ctx context.Context, path string) (net.Listener, error) unaryInterceptors []grpc.UnaryServerInterceptor streamInterceptors []grpc.StreamServerInterceptor kubeClient kubernetes.Interface serialize bool + flockDirectoryPath string nodeV1beta1 bool } @@ -344,17 +399,18 @@ type Helper struct { // backgroundCtx is for activities that are started later. backgroundCtx context.Context // cancel cancels the backgroundCtx. - cancel func(cause error) - wg sync.WaitGroup - registrar *nodeRegistrar - pluginServer *grpcServer - plugin DRAPlugin - driverName string - nodeName string - nodeUID types.UID - kubeClient kubernetes.Interface - serialize bool - grpcMutex sync.Mutex + cancel func(cause error) + wg sync.WaitGroup + registrar *nodeRegistrar + pluginServer *grpcServer + plugin DRAPlugin + driverName string + nodeName string + nodeUID types.UID + kubeClient kubernetes.Interface + serialize bool + grpcMutex sync.Mutex + grpcLockFilePath string // Information about resource publishing changes concurrently and thus // must be protected by the mutex. The controller gets started only @@ -392,12 +448,20 @@ func Start(ctx context.Context, plugin DRAPlugin, opts ...Option) (result *Helpe if o.driverName == "" { return nil, errors.New("driver name must be set") } + if o.rollingUpdateUID != "" && o.pluginRegistrationEndpoint.file != "" { + return nil, errors.New("rolling updates and explicit registration socket filename are mutually exclusive") + } + uidPart := "" + if o.rollingUpdateUID != "" { + uidPart = "-" + string(o.rollingUpdateUID) + } if o.pluginRegistrationEndpoint.file == "" { - o.pluginRegistrationEndpoint.file = o.driverName + "-reg.sock" + o.pluginRegistrationEndpoint.file = o.driverName + uidPart + "-reg.sock" } if o.pluginDataDirectoryPath == "" { o.pluginDataDirectoryPath = path.Join(KubeletPluginsDir, o.driverName) } + d := &Helper{ driverName: o.driverName, nodeName: o.nodeName, @@ -406,6 +470,14 @@ func Start(ctx context.Context, plugin DRAPlugin, opts ...Option) (result *Helpe serialize: o.serialize, plugin: plugin, } + if o.rollingUpdateUID != "" { + dir := o.pluginDataDirectoryPath + if o.flockDirectoryPath != "" { + dir = o.flockDirectoryPath + } + // Enable file locking, required for concurrently running pods. + d.grpcLockFilePath = path.Join(dir, "serialize.lock") + } // Stop calls cancel and therefore both cancellation // and Stop cause goroutines to stop. @@ -434,7 +506,7 @@ func Start(ctx context.Context, plugin DRAPlugin, opts ...Option) (result *Helpe var supportedServices []string draEndpoint := endpoint{ dir: o.pluginDataDirectoryPath, - file: "dra.sock", // "dra" is hard-coded. + file: "dra" + uidPart + ".sock", // "dra" is hard-coded. The directory is unique, so we get a unique full path also without the UID. listenFunc: o.draEndpointListen, } pluginServer, err := startGRPCServer(klog.LoggerWithName(logger, "dra"), o.grpcVerbosity, o.unaryInterceptors, o.streamInterceptors, draEndpoint, func(grpcServer *grpc.Server) { @@ -575,12 +647,25 @@ func (d *Helper) RegistrationStatus() *registerapi.RegistrationStatus { // serializeGRPCIfEnabled locks a mutex if serialization is enabled. // Either way it returns a method that the caller must invoke // via defer. -func (d *Helper) serializeGRPCIfEnabled() func() { +func (d *Helper) serializeGRPCIfEnabled() (func(), error) { if !d.serialize { - return func() {} + return func() {}, nil } + + // If rolling updates are enabled, we cannot do only in-memory locking. + // We must use file locking. + if d.grpcLockFilePath != "" { + file, err := fileutil.LockFile(d.grpcLockFilePath, os.O_RDWR|os.O_CREATE, 0666) + if err != nil { + return nil, fmt.Errorf("lock file: %w", err) + } + return func() { + _ = file.Close() + }, nil + } + d.grpcMutex.Lock() - return d.grpcMutex.Unlock + return d.grpcMutex.Unlock, nil } // nodePluginImplementation is a thin wrapper around the helper instance. @@ -597,7 +682,11 @@ func (d *nodePluginImplementation) NodePrepareResources(ctx context.Context, req return nil, fmt.Errorf("get resource claims: %w", err) } - defer d.serializeGRPCIfEnabled()() + unlock, err := d.serializeGRPCIfEnabled() + if err != nil { + return nil, fmt.Errorf("serialize gRPC: %w", err) + } + defer unlock() result, err := d.plugin.PrepareResourceClaims(ctx, claims) if err != nil { @@ -659,7 +748,11 @@ func (d *nodePluginImplementation) getResourceClaims(ctx context.Context, claims // NodeUnprepareResources implements [draapi.NodeUnprepareResources]. func (d *nodePluginImplementation) NodeUnprepareResources(ctx context.Context, req *drapb.NodeUnprepareResourcesRequest) (*drapb.NodeUnprepareResourcesResponse, error) { - defer d.serializeGRPCIfEnabled() + unlock, err := d.serializeGRPCIfEnabled() + if err != nil { + return nil, fmt.Errorf("serialize gRPC: %w", err) + } + defer unlock() claims := make([]NamespacedObject, 0, len(req.Claims)) for _, claim := range req.Claims { diff --git a/test/e2e/dra/deploy.go b/test/e2e/dra/deploy.go index a451be4af68..ba6ef51fa3b 100644 --- a/test/e2e/dra/deploy.go +++ b/test/e2e/dra/deploy.go @@ -77,6 +77,7 @@ const ( type Nodes struct { NodeNames []string + tempDir string } type Resources struct { @@ -112,6 +113,8 @@ func NewNodesNow(ctx context.Context, f *framework.Framework, minNodes, maxNodes } func (nodes *Nodes) init(ctx context.Context, f *framework.Framework, minNodes, maxNodes int) { + nodes.tempDir = ginkgo.GinkgoT().TempDir() + ginkgo.By("selecting nodes") // The kubelet plugin is harder. We deploy the builtin manifest // after patching in the driver name and all nodes on which we @@ -219,6 +222,12 @@ func (d *Driver) Run(nodes *Nodes, configureResources func() Resources, devicesP ginkgo.DeferCleanup(d.TearDown) } +// NewGetSlices generates a function for gomega.Eventually/Consistently which +// returns the ResourceSliceList. +func (d *Driver) NewGetSlices() framework.GetFunc[*resourceapi.ResourceSliceList] { + return framework.ListObjects(d.f.ClientSet.ResourceV1beta1().ResourceSlices().List, metav1.ListOptions{FieldSelector: resourceapi.ResourceSliceSelectorDriver + "=" + d.Name}) +} + type MethodInstance struct { Nodename string FullMethod string @@ -240,6 +249,10 @@ type Driver struct { // The socket path is still the same. InstanceSuffix string + // RollingUpdate can be set to true to enable using different socket names + // for different pods and thus seamless upgrades. Must be supported by the kubelet! + RollingUpdate bool + // Name gets derived automatically from the current test namespace and // (if set) the NameSuffix while setting up the driver for a test. Name string @@ -351,7 +364,6 @@ func (d *Driver) SetUp(nodes *Nodes, resources Resources, devicesPerNode ...map[ numNodes := int32(len(nodes.NodeNames)) pluginDataDirectoryPath := path.Join(framework.TestContext.KubeletRootDir, "plugins", d.Name) registrarDirectoryPath := path.Join(framework.TestContext.KubeletRootDir, "plugins_registry") - registrarSocketFilename := d.Name + "-reg.sock" instanceName := d.Name + d.InstanceSuffix err := utils.CreateFromManifests(ctx, d.f, d.f.Namespace, func(item interface{}) error { switch item := item.(type) { @@ -447,6 +459,14 @@ func (d *Driver) SetUp(nodes *Nodes, resources Resources, devicesPerNode ...map[ // All listeners running in this pod use a new unique local port number // by atomically incrementing this variable. listenerPort := int32(9000) + rollingUpdateUID := pod.UID + serialize := true + if !d.RollingUpdate { + rollingUpdateUID = "" + // A test might have to execute two gRPC calls in parallel, so only + // serialize when we explicitly want to test a rolling update. + serialize = false + } plugin, err := app.StartPlugin(loggerCtx, "/cdi", d.Name, driverClient, nodename, fileOps, kubeletplugin.GRPCVerbosity(0), kubeletplugin.GRPCInterceptor(func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { @@ -456,11 +476,14 @@ func (d *Driver) SetUp(nodes *Nodes, resources Resources, devicesPerNode ...map[ return d.streamInterceptor(nodename, srv, ss, info, handler) }), + kubeletplugin.RollingUpdate(rollingUpdateUID), + kubeletplugin.Serialize(serialize), + kubeletplugin.FlockDirectoryPath(nodes.tempDir), + kubeletplugin.PluginDataDirectoryPath(pluginDataDirectoryPath), kubeletplugin.PluginListener(listen(d.f, &pod, &listenerPort)), kubeletplugin.RegistrarDirectoryPath(registrarDirectoryPath), - kubeletplugin.RegistrarSocketFilename(registrarSocketFilename), kubeletplugin.RegistrarListener(listen(d.f, &pod, &listenerPort)), ) framework.ExpectNoError(err, "start kubelet plugin for node %s", pod.Spec.NodeName) diff --git a/test/e2e/dra/dra.go b/test/e2e/dra/dra.go index 80fd5806cb2..091cfdd6fd7 100644 --- a/test/e2e/dra/dra.go +++ b/test/e2e/dra/dra.go @@ -1802,6 +1802,91 @@ var _ = framework.SIGDescribe("node")("DRA", feature.DynamicResourceAllocation, framework.ExpectNoError(e2epod.WaitForPodNotFoundInNamespace(ctx, f.ClientSet, pod.Name, pod.Namespace, f.Timeouts.PodDelete)) }) + ginkgo.It("rolling update", func(ctx context.Context) { + nodes := NewNodesNow(ctx, f, 1, 1) + + oldDriver := NewDriverInstance(f) + oldDriver.InstanceSuffix = "-old" + oldDriver.RollingUpdate = true + oldDriver.Run(nodes, perNode(1, nodes)) + + // We expect one ResourceSlice per node from the driver. + getSlices := oldDriver.NewGetSlices() + gomega.Eventually(ctx, getSlices).Should(gomega.HaveField("Items", gomega.HaveLen(len(nodes.NodeNames)))) + initialSlices, err := getSlices(ctx) + framework.ExpectNoError(err) + + // Same driver name, different socket paths because of rolling update. + newDriver := NewDriverInstance(f) + newDriver.InstanceSuffix = "-new" + newDriver.RollingUpdate = true + newDriver.Run(nodes, perNode(1, nodes)) + + // Stop old driver instance. + oldDriver.TearDown(ctx) + + // Build behaves the same for both driver instances. + b := newBuilderNow(ctx, f, oldDriver) + claim := b.externalClaim() + pod := b.podExternal() + b.create(ctx, claim, pod) + b.testPod(ctx, f, pod) + + // The exact same slices should still exist. + finalSlices, err := getSlices(ctx) + framework.ExpectNoError(err) + gomega.Expect(finalSlices.Items).Should(gomega.Equal(initialSlices.Items)) + + // We need to clean up explicitly because the normal + // cleanup doesn't work (driver shuts down first). + // framework.ExpectNoError(f.ClientSet.ResourceV1beta1().ResourceClaims(claim.Namespace).Delete(ctx, claim.Name, metav1.DeleteOptions{})) + framework.ExpectNoError(f.ClientSet.CoreV1().Pods(pod.Namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{})) + framework.ExpectNoError(e2epod.WaitForPodNotFoundInNamespace(ctx, f.ClientSet, pod.Name, pod.Namespace, f.Timeouts.PodDelete)) + }) + + ginkgo.It("failed update", func(ctx context.Context) { + nodes := NewNodesNow(ctx, f, 1, 1) + + oldDriver := NewDriverInstance(f) + oldDriver.InstanceSuffix = "-old" + oldDriver.RollingUpdate = true + oldDriver.Run(nodes, perNode(1, nodes)) + + // We expect one ResourceSlice per node from the driver. + getSlices := oldDriver.NewGetSlices() + gomega.Eventually(ctx, getSlices).Should(gomega.HaveField("Items", gomega.HaveLen(len(nodes.NodeNames)))) + initialSlices, err := getSlices(ctx) + framework.ExpectNoError(err) + + // Same driver name, different socket paths because of rolling update. + newDriver := NewDriverInstance(f) + newDriver.InstanceSuffix = "-new" + newDriver.RollingUpdate = true + newDriver.Run(nodes, perNode(1, nodes)) + + // Stop new driver instance, simulating the failure of the new instance. + // The kubelet should still have the old instance. + newDriver.TearDown(ctx) + + // Build behaves the same for both driver instances. + b := newBuilderNow(ctx, f, oldDriver) + claim := b.externalClaim() + pod := b.podExternal() + b.create(ctx, claim, pod) + b.testPod(ctx, f, pod) + + // The exact same slices should still exist. + finalSlices, err := getSlices(ctx) + framework.ExpectNoError(err) + gomega.Expect(finalSlices.Items).Should(gomega.Equal(initialSlices.Items)) + + // We need to clean up explicitly because the normal + // cleanup doesn't work (driver shuts down first). + // framework.ExpectNoError(f.ClientSet.ResourceV1beta1().ResourceClaims(claim.Namespace).Delete(ctx, claim.Name, metav1.DeleteOptions{})) + framework.ExpectNoError(f.ClientSet.CoreV1().Pods(pod.Namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{})) + framework.ExpectNoError(e2epod.WaitForPodNotFoundInNamespace(ctx, f.ClientSet, pod.Name, pod.Namespace, f.Timeouts.PodDelete)) + }) + f.It("sequential update with pods replacing each other", framework.WithSlow(), func(ctx context.Context) { nodes := NewNodesNow(ctx, f, 1, 1) @@ -1823,7 +1908,7 @@ var _ = framework.SIGDescribe("node")("DRA", feature.DynamicResourceAllocation, // "Update" the driver by taking it down and bringing up a new one. // Pods never run in parallel, similar to how a DaemonSet would update - // its pods. + // its pods when maxSurge is zero. ginkgo.By("reinstall driver") start := time.Now() oldDriver.TearDown(ctx) diff --git a/test/e2e/dra/test-driver/app/kubeletplugin.go b/test/e2e/dra/test-driver/app/kubeletplugin.go index 498e02fc715..875fdf886d8 100644 --- a/test/e2e/dra/test-driver/app/kubeletplugin.go +++ b/test/e2e/dra/test-driver/app/kubeletplugin.go @@ -168,7 +168,6 @@ func StartPlugin(ctx context.Context, cdiDir, driverName string, kubeClient kube kubeletplugin.KubeClient(kubeClient), kubeletplugin.GRPCInterceptor(ex.recordGRPCCall), kubeletplugin.GRPCStreamInterceptor(ex.recordGRPCStream), - kubeletplugin.Serialize(false), // The example plugin does its own locking. ) d, err := kubeletplugin.Start(ctx, ex, opts...) if err != nil {