From 0490b9f0b72155b5a016c4917495fe302f3d5e36 Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Mon, 27 Jan 2025 11:20:01 +0100 Subject: [PATCH 1/4] kubelet: document seamless upgrade support and guidance This tries to capture the current state of affairs and a potential plan for supporting seamless upgrades better. --- .../pluginmanager/pluginwatcher/README.md | 80 +++++++++++++++++++ 1 file changed, 80 insertions(+) diff --git a/pkg/kubelet/pluginmanager/pluginwatcher/README.md b/pkg/kubelet/pluginmanager/pluginwatcher/README.md index aebbfd10254..5de09c14f9c 100644 --- a/pkg/kubelet/pluginmanager/pluginwatcher/README.md +++ b/pkg/kubelet/pluginmanager/pluginwatcher/README.md @@ -16,6 +16,86 @@ there. This socket filename should not start with a '.' as it will be ignored. +To avoid conflicts between different plugins, the recommendation is to use +`[-].sock` as filename. `` +should end with a DNS domain that is unique for the plugin. Each time a plugin +starts, it has to delete old sockets if they exist and listen anew under the +same filename. + +## Seamless Upgrade + +To avoid downtime of a plugin on a node, it would be nice to support running an +old plugin in parallel to the new plugin. When deploying with a DaemonSet, +setting `maxSurge` to a value larger than zero enables such a seamless upgrade. + +**Warning**: Such a seamless upgrade **is not** supported at the moment. This +section merely describes what would have to be changed to make it work. + +### In a plugin + +To support seamless upgrades, each plugin instance must use a unique +socket filename. Otherwise the following could happen: +- The old instance is registered with `plugin.example.com-reg.sock`. +- The new instance starts, unlinks that file, and starts listening on it again. +- In parallel, the kubelet notices the removal and unregisters the plugin + before probing the new instance, thus breaking the seamless upgrade. + +Even if the timing is more favorable and unregistration is avoided, using the +same socket is problematic: if the new instance fails, the kubelet cannot fall +back to the old instance because that old instance is not listening to the +socket that is available under `plugin.example.com-reg.sock`. + +This can be achieved in a DaemonSet by passing the UID of the pod into the pod +through the downward API. New instances may try to clean up stale sockets of +older instances, but have to be absolutely sure that those sockets really +aren't in use anymore. Each instance should catch termination signals and clean +up after itself. Then sockets only leak during abnormal events (power loss, +killing with SIGKILL). + +Last but not least, both plugin instances must be usable in parallel. It is not +predictable which instance the kubelet will use for which request. + +### In the kubelet + +For such a seamless upgrade with different sockets per plugin to work reliably, +the handler for the plugin type must track all registered instances. Then if +one of them fails and gets unregistered, it can fall back to some +other. Picking the most recently registered instance is a good heuristic. This +isn't perfect because after a kubelet restart, plugin instances get registered +in a random order. Restarting the kubelet in the middle of an upgrade should be +rare. + +At the moment, none of the existing handlers support such seamless upgrades: + +- The device plugin handler suffers from temporarily removing the extended + resources during an upgrade. A proposed fix is pending in + https://github.com/kubernetes/kubernetes/pull/127821. + +- The CSI handler [tries to determine which instance is newer](https://github.com/kubernetes/kubernetes/blob/7140b4910c6c1179c9778a7f3bb8037356febd58/pkg/volume/csi/csi_plugin.go#L115-L125) based on the supported version(s) and + only remembers that one. If that newest instance fails, there is no fallback. + + In practice, most CSI drivers probably all pass [the hard-coded "1.0.0"](https://github.com/kubernetes-csi/node-driver-registrar/blob/27700e2962cd35b9f2336a156146181e5c75399e/cmd/csi-node-driver-registrar/main.go#L72) + from the csi-node-registrar as supported version, so this version + selection mechanism isn't used at all. + +- The DRA handler only remembers the most recently registered instance. + +### Deployment + +Deploying a plugin with support for seamless upgrades and per-instance socket +filenames is *not* compatible with a kubelet version that does not have support +for seamless upgrades yet. It breaks like this: + +- New instance starts, gets registered and replaces the old one. +- Old instance stops, removing its socket. +- The kubelet notices that, unregisters the plugin. +- The plugin handler removes *the new* instance because it ignores the socket path -> no instance left. + +Plugin authors either have to assume that the cluster has a recent enough +kubelet or rely on labeling nodes with support. Then the plugin can use one +simple DaemonSet for nodes without support and another, more complex one where +`maxSurge` is increased to enable seamless upgrades on nodes which support it. +No such label is specified at the moment. ## gRPC Service Lifecycle From 760903c0de6fe1724b99916fa59b16dfd71617b3 Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Mon, 27 Jan 2025 13:35:13 +0100 Subject: [PATCH 2/4] DRA kubelet: give DRA drivers a 30 second grace period for updates When doing an update of a DaemonSet, first the old pod gets stopped and then the new one is started. This causes the kubelet to remove all ResourceSlices directly after removal and forces the new pod to recreate all of them. Now the kubelet waits 30 seconds before it deletes ResourceSlices. If a new driver registers during that period, nothing is done at all. The new driver finds the existing ResourceSlices and only needs to update them if something changed. The downside is that if the driver gets removed permanently, this creates a delay where pods might still get scheduled to the node although the driver is not going to run there anymore and thus the pods will be stuck. --- pkg/kubelet/cm/dra/manager.go | 15 +++- pkg/kubelet/cm/dra/manager_test.go | 6 +- pkg/kubelet/cm/dra/plugin/registration.go | 80 +++++++++++++++++-- .../cm/dra/plugin/registration_test.go | 2 +- test/e2e/dra/deploy.go | 64 +++++++++++---- test/e2e/dra/dra.go | 59 ++++++++++++++ 6 files changed, 201 insertions(+), 25 deletions(-) diff --git a/pkg/kubelet/cm/dra/manager.go b/pkg/kubelet/cm/dra/manager.go index 8e965c6b694..26043a0fd28 100644 --- a/pkg/kubelet/cm/dra/manager.go +++ b/pkg/kubelet/cm/dra/manager.go @@ -98,7 +98,20 @@ func NewManagerImpl(kubeClient clientset.Interface, stateFileDirectory string, n } func (m *ManagerImpl) GetWatcherHandler() cache.PluginHandler { - return cache.PluginHandler(dra.NewRegistrationHandler(m.kubeClient, m.getNode)) + // The time that DRA drivers have to come back after being unregistered + // before the kubelet removes their ResourceSlices. + // + // This must be long enough to actually allow stopping a pod and + // starting the replacement (otherwise ResourceSlices get deleted + // unnecessarily) and not too long (otherwise the time window were + // pods might still get scheduled to the node after removal of a + // driver is too long). + // + // 30 seconds might be long enough for a simple container restart. + // If a DRA driver wants to be sure that slices don't get wiped, + // it should use rolling updates. + wipingDelay := 30 * time.Second + return cache.PluginHandler(dra.NewRegistrationHandler(m.kubeClient, m.getNode, wipingDelay)) } // Start starts the reconcile loop of the manager. diff --git a/pkg/kubelet/cm/dra/manager_test.go b/pkg/kubelet/cm/dra/manager_test.go index bc2448690a8..908cd735847 100644 --- a/pkg/kubelet/cm/dra/manager_test.go +++ b/pkg/kubelet/cm/dra/manager_test.go @@ -580,7 +580,7 @@ func TestPrepareResources(t *testing.T) { } defer draServerInfo.teardownFn() - plg := plugin.NewRegistrationHandler(nil, getFakeNode) + plg := plugin.NewRegistrationHandler(nil, getFakeNode, time.Second /* very short wiping delay for testing */) if err := plg.RegisterPlugin(test.driverName, draServerInfo.socketName, []string{drapb.DRAPluginService}, pluginClientTimeout); err != nil { t.Fatalf("failed to register plugin %s, err: %v", test.driverName, err) } @@ -717,7 +717,7 @@ func TestUnprepareResources(t *testing.T) { } defer draServerInfo.teardownFn() - plg := plugin.NewRegistrationHandler(nil, getFakeNode) + plg := plugin.NewRegistrationHandler(nil, getFakeNode, time.Second /* very short wiping delay for testing */) if err := plg.RegisterPlugin(test.driverName, draServerInfo.socketName, []string{drapb.DRAPluginService}, pluginClientTimeout); err != nil { t.Fatalf("failed to register plugin %s, err: %v", test.driverName, err) } @@ -887,7 +887,7 @@ func TestParallelPrepareUnprepareResources(t *testing.T) { } defer draServerInfo.teardownFn() - plg := plugin.NewRegistrationHandler(nil, getFakeNode) + plg := plugin.NewRegistrationHandler(nil, getFakeNode, time.Second /* very short wiping delay for testing */) if err := plg.RegisterPlugin(driverName, draServerInfo.socketName, []string{drapb.DRAPluginService}, nil); err != nil { t.Fatalf("failed to register plugin %s, err: %v", driverName, err) } diff --git a/pkg/kubelet/cm/dra/plugin/registration.go b/pkg/kubelet/cm/dra/plugin/registration.go index 23ddbb73a70..2af7db5bc3b 100644 --- a/pkg/kubelet/cm/dra/plugin/registration.go +++ b/pkg/kubelet/cm/dra/plugin/registration.go @@ -21,6 +21,7 @@ import ( "errors" "fmt" "slices" + "sync" "time" v1 "k8s.io/api/core/v1" @@ -53,6 +54,18 @@ type RegistrationHandler struct { backgroundCtx context.Context kubeClient kubernetes.Interface getNode func() (*v1.Node, error) + wipingDelay time.Duration + + mutex sync.Mutex + + // pendingWipes maps a plugin name to a cancel function for + // wiping of that plugin's ResourceSlices. Entries get added + // in DeRegisterPlugin and check in RegisterPlugin. If + // wiping is pending during RegisterPlugin, it gets canceled. + // + // Must use pointers to functions because the entries have to + // be comparable. + pendingWipes map[string]*context.CancelCauseFunc } var _ cache.PluginHandler = &RegistrationHandler{} @@ -62,12 +75,14 @@ var _ cache.PluginHandler = &RegistrationHandler{} // Must only be called once per process because it manages global state. // If a kubeClient is provided, then it synchronizes ResourceSlices // with the resource information provided by plugins. -func NewRegistrationHandler(kubeClient kubernetes.Interface, getNode func() (*v1.Node, error)) *RegistrationHandler { +func NewRegistrationHandler(kubeClient kubernetes.Interface, getNode func() (*v1.Node, error), wipingDelay time.Duration) *RegistrationHandler { handler := &RegistrationHandler{ // The context and thus logger should come from the caller. backgroundCtx: klog.NewContext(context.TODO(), klog.LoggerWithName(klog.TODO(), "DRA registration handler")), kubeClient: kubeClient, getNode: getNode, + wipingDelay: wipingDelay, + pendingWipes: make(map[string]*context.CancelCauseFunc), } // When kubelet starts up, no DRA driver has registered yet. None of @@ -77,19 +92,34 @@ func NewRegistrationHandler(kubeClient kubernetes.Interface, getNode func() (*v1 // to start up. // // This has to run in the background. - go handler.wipeResourceSlices("") + logger := klog.LoggerWithName(klog.FromContext(handler.backgroundCtx), "startup") + ctx := klog.NewContext(handler.backgroundCtx, logger) + go handler.wipeResourceSlices(ctx, 0 /* no delay */, "" /* all drivers */) return handler } // wipeResourceSlices deletes ResourceSlices of the node, optionally just for a specific driver. -func (h *RegistrationHandler) wipeResourceSlices(driver string) { +// Wiping will delay for a while and can be canceled by canceling the context. +func (h *RegistrationHandler) wipeResourceSlices(ctx context.Context, delay time.Duration, driver string) { if h.kubeClient == nil { return } - ctx := h.backgroundCtx logger := klog.FromContext(ctx) + if delay != 0 { + // Before we start deleting, give the driver time to bounce back. + // Perhaps it got removed as part of a DaemonSet update and the + // replacement pod is about to start. + logger.V(4).Info("Starting to wait before wiping ResourceSlices", "delay", delay) + select { + case <-ctx.Done(): + logger.V(4).Info("Aborting wiping of ResourceSlices", "reason", context.Cause(ctx)) + case <-time.After(delay): + logger.V(4).Info("Starting to wipe ResourceSlices after waiting", "delay", delay) + } + } + backoff := wait.Backoff{ Duration: time.Second, Factor: 2, @@ -184,6 +214,15 @@ func (h *RegistrationHandler) RegisterPlugin(pluginName string, endpoint string, logger.V(1).Info("DRA plugin already registered, the old plugin was replaced and will be forgotten by the kubelet till the next kubelet restart", "oldEndpoint", oldPlugin.endpoint) } + // Now cancel any pending ResourceSlice wiping for this plugin. + // Only needs to be done once. + h.mutex.Lock() + defer h.mutex.Unlock() + if cancel := h.pendingWipes[pluginName]; cancel != nil { + (*cancel)(errors.New("new plugin instance registered")) + delete(h.pendingWipes, pluginName) + } + return nil } @@ -225,11 +264,42 @@ func (h *RegistrationHandler) DeRegisterPlugin(pluginName string) { logger := klog.FromContext(p.backgroundCtx) logger.V(3).Info("Deregister DRA plugin", "endpoint", p.endpoint) + // Prepare for canceling the background wiping. This needs to run + // in the context of the registration handler, the one from + // the plugin is canceled. + logger = klog.FromContext(h.backgroundCtx) + logger = klog.LoggerWithName(logger, "driver-cleanup") + logger = klog.LoggerWithValues(logger, "pluginName", pluginName) + ctx, cancel := context.WithCancelCause(h.backgroundCtx) + ctx = klog.NewContext(ctx, logger) + // Clean up the ResourceSlices for the deleted Plugin since it // may have died without doing so itself and might never come // back. - go h.wipeResourceSlices(pluginName) + // + // May get canceled if the plugin comes back quickly enough + // (see RegisterPlugin). + h.mutex.Lock() + defer h.mutex.Unlock() + if cancel := h.pendingWipes[pluginName]; cancel != nil { + (*cancel)(errors.New("plugin deregistered a second time")) + } + h.pendingWipes[pluginName] = &cancel + go func() { + defer func() { + h.mutex.Lock() + defer h.mutex.Unlock() + + // Cancel our own context, but remove it from the map only if it + // is the current entry. Perhaps it already got replaced. + cancel(errors.New("wiping done")) + if h.pendingWipes[pluginName] == &cancel { + delete(h.pendingWipes, pluginName) + } + }() + h.wipeResourceSlices(ctx, h.wipingDelay, pluginName) + }() return } diff --git a/pkg/kubelet/cm/dra/plugin/registration_test.go b/pkg/kubelet/cm/dra/plugin/registration_test.go index 949682f6aab..4920fdf34ec 100644 --- a/pkg/kubelet/cm/dra/plugin/registration_test.go +++ b/pkg/kubelet/cm/dra/plugin/registration_test.go @@ -157,7 +157,7 @@ func TestRegistrationHandler(t *testing.T) { } // The handler wipes all slices at startup. - handler := NewRegistrationHandler(client, getFakeNode) + handler := NewRegistrationHandler(client, getFakeNode, time.Second /* very short wiping delay for testing */) requireNoSlices := func() { t.Helper() if client == nil { diff --git a/test/e2e/dra/deploy.go b/test/e2e/dra/deploy.go index 37fc5226e0e..a451be4af68 100644 --- a/test/e2e/dra/deploy.go +++ b/test/e2e/dra/deploy.go @@ -66,6 +66,7 @@ import ( e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper" "k8s.io/kubernetes/test/e2e/storage/drivers/proxy" "k8s.io/kubernetes/test/e2e/storage/utils" + "k8s.io/utils/ptr" "sigs.k8s.io/yaml" ) @@ -214,7 +215,6 @@ func (d *Driver) Run(nodes *Nodes, configureResources func() Resources, devicesP // not run on all nodes. resources.Nodes = nodes.NodeNames } - ginkgo.DeferCleanup(d.IsGone) // Register first so it gets called last. d.SetUp(nodes, resources, devicesPerNode...) ginkgo.DeferCleanup(d.TearDown) } @@ -227,12 +227,22 @@ type MethodInstance struct { type Driver struct { f *framework.Framework ctx context.Context - cleanup []func() // executed first-in-first-out + cleanup []func(context.Context) // executed first-in-first-out wg sync.WaitGroup serviceAccountName string + // NameSuffix can be set while registering a test to deploy different + // drivers in the same test namespace. NameSuffix string - Name string + + // InstanceSuffix can be set while registering a test to deploy two different + // instances of the same driver. Used to generate unique objects in the API server. + // The socket path is still the same. + InstanceSuffix string + + // Name gets derived automatically from the current test namespace and + // (if set) the NameSuffix while setting up the driver for a test. + Name string // Nodes contains entries for each node selected for a test when the test runs. // In addition, there is one entry for a fictional node. @@ -263,9 +273,13 @@ func (d *Driver) SetUp(nodes *Nodes, resources Resources, devicesPerNode ...map[ ctx, cancel := context.WithCancel(context.Background()) logger := klog.FromContext(ctx) logger = klog.LoggerWithValues(logger, "driverName", d.Name) + if d.InstanceSuffix != "" { + instance, _ := strings.CutPrefix(d.InstanceSuffix, "-") + logger = klog.LoggerWithValues(logger, "instance", instance) + } ctx = klog.NewContext(ctx, logger) d.ctx = ctx - d.cleanup = append(d.cleanup, cancel) + d.cleanup = append(d.cleanup, func(context.Context) { cancel() }) if !resources.NodeLocal { // Publish one resource pool with "network-attached" devices. @@ -323,28 +337,32 @@ func (d *Driver) SetUp(nodes *Nodes, resources Resources, devicesPerNode ...map[ } // Create service account and corresponding RBAC rules. - d.serviceAccountName = "dra-kubelet-plugin-" + d.Name + "-service-account" + d.serviceAccountName = "dra-kubelet-plugin-" + d.Name + d.InstanceSuffix + "-service-account" content := pluginPermissions content = strings.ReplaceAll(content, "dra-kubelet-plugin-namespace", d.f.Namespace.Name) - content = strings.ReplaceAll(content, "dra-kubelet-plugin", "dra-kubelet-plugin-"+d.Name) + content = strings.ReplaceAll(content, "dra-kubelet-plugin", "dra-kubelet-plugin-"+d.Name+d.InstanceSuffix) d.createFromYAML(ctx, []byte(content), d.f.Namespace.Name) + // Using a ReplicaSet instead of a DaemonSet has the advantage that we can control + // the lifecycle explicitly, in particular run two pods per node long enough to + // run checks. instanceKey := "app.kubernetes.io/instance" rsName := "" numNodes := int32(len(nodes.NodeNames)) pluginDataDirectoryPath := path.Join(framework.TestContext.KubeletRootDir, "plugins", d.Name) registrarDirectoryPath := path.Join(framework.TestContext.KubeletRootDir, "plugins_registry") registrarSocketFilename := d.Name + "-reg.sock" + instanceName := d.Name + d.InstanceSuffix err := utils.CreateFromManifests(ctx, d.f, d.f.Namespace, func(item interface{}) error { switch item := item.(type) { case *appsv1.ReplicaSet: - item.Name += d.NameSuffix + item.Name += d.NameSuffix + d.InstanceSuffix rsName = item.Name item.Spec.Replicas = &numNodes - item.Spec.Selector.MatchLabels[instanceKey] = d.Name - item.Spec.Template.Labels[instanceKey] = d.Name + item.Spec.Selector.MatchLabels[instanceKey] = instanceName + item.Spec.Template.Labels[instanceKey] = instanceName item.Spec.Template.Spec.ServiceAccountName = d.serviceAccountName - item.Spec.Template.Spec.Affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution[0].LabelSelector.MatchLabels[instanceKey] = d.Name + item.Spec.Template.Spec.Affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution[0].LabelSelector.MatchLabels[instanceKey] = instanceName item.Spec.Template.Spec.Affinity.NodeAffinity = &v1.NodeAffinity{ RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{ NodeSelectorTerms: []v1.NodeSelectorTerm{ @@ -376,7 +394,7 @@ func (d *Driver) SetUp(nodes *Nodes, resources Resources, devicesPerNode ...map[ if err := e2ereplicaset.WaitForReplicaSetTargetAvailableReplicas(ctx, d.f.ClientSet, rs, numNodes); err != nil { framework.ExpectNoError(err, "all kubelet plugin proxies running") } - requirement, err := labels.NewRequirement(instanceKey, selection.Equals, []string{d.Name}) + requirement, err := labels.NewRequirement(instanceKey, selection.Equals, []string{instanceName}) framework.ExpectNoError(err, "create label selector requirement") selector := labels.NewSelector().Add(*requirement) pods, err := d.f.ClientSet.CoreV1().Pods(d.f.Namespace.Name).List(ctx, metav1.ListOptions{LabelSelector: selector.String()}) @@ -446,9 +464,20 @@ func (d *Driver) SetUp(nodes *Nodes, resources Resources, devicesPerNode ...map[ kubeletplugin.RegistrarListener(listen(d.f, &pod, &listenerPort)), ) framework.ExpectNoError(err, "start kubelet plugin for node %s", pod.Spec.NodeName) - d.cleanup = append(d.cleanup, func() { + d.cleanup = append(d.cleanup, func(ctx context.Context) { // Depends on cancel being called first. plugin.Stop() + + // Also explicitly stop all pods. + ginkgo.By("scaling down driver proxy pods for " + d.Name) + rs, err := d.f.ClientSet.AppsV1().ReplicaSets(d.f.Namespace.Name).Get(ctx, rsName, metav1.GetOptions{}) + framework.ExpectNoError(err, "get ReplicaSet for driver "+d.Name) + rs.Spec.Replicas = ptr.To(int32(0)) + rs, err = d.f.ClientSet.AppsV1().ReplicaSets(d.f.Namespace.Name).Update(ctx, rs, metav1.UpdateOptions{}) + framework.ExpectNoError(err, "scale down ReplicaSet for driver "+d.Name) + if err := e2ereplicaset.WaitForReplicaSetTargetAvailableReplicas(ctx, d.f.ClientSet, rs, 0); err != nil { + framework.ExpectNoError(err, "all kubelet plugin proxies stopped") + } }) d.Nodes[nodename] = KubeletPlugin{ExamplePlugin: plugin, ClientSet: driverClient} } @@ -717,14 +746,19 @@ func pipe(ctx context.Context, msg string, verbosity int) *io.PipeWriter { return writer } -func (d *Driver) TearDown() { +func (d *Driver) TearDown(ctx context.Context) { for _, c := range d.cleanup { - c() + c(ctx) } d.cleanup = nil d.wg.Wait() } +// IsGone checks that the kubelet is done with the driver. +// This is done by waiting for the kubelet to remove the +// driver's ResourceSlices, which takes at least 5 minutes +// because of the delay in the kubelet. Only use this in slow +// tests... func (d *Driver) IsGone(ctx context.Context) { gomega.Eventually(ctx, func(ctx context.Context) ([]resourceapi.ResourceSlice, error) { slices, err := d.f.ClientSet.ResourceV1beta1().ResourceSlices().List(ctx, metav1.ListOptions{FieldSelector: resourceapi.ResourceSliceSelectorDriver + "=" + d.Name}) @@ -732,7 +766,7 @@ func (d *Driver) IsGone(ctx context.Context) { return nil, err } return slices.Items, err - }).Should(gomega.BeEmpty()) + }).WithTimeout(7 * time.Minute).Should(gomega.BeEmpty()) } func (d *Driver) interceptor(nodename string, ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { diff --git a/test/e2e/dra/dra.go b/test/e2e/dra/dra.go index b93c44a9b1e..80fd5806cb2 100644 --- a/test/e2e/dra/dra.go +++ b/test/e2e/dra/dra.go @@ -1801,6 +1801,65 @@ var _ = framework.SIGDescribe("node")("DRA", feature.DynamicResourceAllocation, framework.ExpectNoError(f.ClientSet.CoreV1().Pods(pod.Namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{})) framework.ExpectNoError(e2epod.WaitForPodNotFoundInNamespace(ctx, f.ClientSet, pod.Name, pod.Namespace, f.Timeouts.PodDelete)) }) + + f.It("sequential update with pods replacing each other", framework.WithSlow(), func(ctx context.Context) { + nodes := NewNodesNow(ctx, f, 1, 1) + + // Same driver name, same socket path. + oldDriver := NewDriverInstance(f) + oldDriver.InstanceSuffix = "-old" + oldDriver.Run(nodes, perNode(1, nodes)) + + // Collect set of resource slices for that driver. + listSlices := framework.ListObjects(f.ClientSet.ResourceV1beta1().ResourceSlices().List, metav1.ListOptions{ + FieldSelector: "spec.driver=" + oldDriver.Name, + }) + gomega.Eventually(ctx, listSlices).Should(gomega.HaveField("Items", gomega.Not(gomega.BeEmpty())), "driver should have published ResourceSlices, got none") + oldSlices, err := listSlices(ctx) + framework.ExpectNoError(err, "list slices published by old driver") + if len(oldSlices.Items) == 0 { + framework.Fail("driver should have published ResourceSlices, got none") + } + + // "Update" the driver by taking it down and bringing up a new one. + // Pods never run in parallel, similar to how a DaemonSet would update + // its pods. + ginkgo.By("reinstall driver") + start := time.Now() + oldDriver.TearDown(ctx) + newDriver := NewDriverInstance(f) + newDriver.InstanceSuffix = "-new" + newDriver.Run(nodes, perNode(1, nodes)) + updateDuration := time.Since(start) + + // Build behaves the same for both driver instances. + b := newBuilderNow(ctx, f, oldDriver) + claim := b.externalClaim() + pod := b.podExternal() + b.create(ctx, claim, pod) + b.testPod(ctx, f, pod) + + // The slices should have survived the update, but only if it happened + // quickly enough. If it took too long, the kubelet considered the driver + // gone and removed them. + if updateDuration <= 3*time.Minute { + newSlices, err := listSlices(ctx) + framework.ExpectNoError(err, "list slices again") + gomega.Expect(newSlices.Items).To(gomega.ConsistOf(oldSlices.Items)) + } + + // We need to clean up explicitly because the normal + // cleanup doesn't work (driver shuts down first). + // framework.ExpectNoError(f.ClientSet.ResourceV1beta1().ResourceClaims(claim.Namespace).Delete(ctx, claim.Name, metav1.DeleteOptions{})) + framework.ExpectNoError(f.ClientSet.CoreV1().Pods(pod.Namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{})) + framework.ExpectNoError(e2epod.WaitForPodNotFoundInNamespace(ctx, f.ClientSet, pod.Name, pod.Namespace, f.Timeouts.PodDelete)) + + // Now shut down for good and wait for the kubelet to react. + // This takes time... + ginkgo.By("uninstalling driver and waiting for ResourceSlice wiping") + newDriver.TearDown(ctx) + newDriver.IsGone(ctx) + }) }) // builder contains a running counter to make objects unique within thir From b471c2c11f6cf40da78a3cfa3626449d247b05ad Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Tue, 28 Jan 2025 16:20:07 +0100 Subject: [PATCH 3/4] DRA kubelet: support rolling upgrades The key difference is that the kubelet must remember all plugin instances because it could always happen that the new instance dies and leaves only the old one running. The endpoints of each instance must be different. Registering a plugin with the same endpoint as some other instance is not supported and triggers an error, which should get reported as "not registered" to the plugin. This should only happen when the kubelet missed some unregistration event and re-registers the same instance again. The recovery in this case is for the plugin to shut down, remove its socket, which should get observed by kubelet, and then try again after a restart. --- .../devicemanager/plugin/v1beta1/handler.go | 5 +- pkg/kubelet/cm/dra/manager_test.go | 6 +- pkg/kubelet/cm/dra/plugin/plugin_test.go | 6 +- pkg/kubelet/cm/dra/plugin/plugins_store.go | 59 ++++++++++++------- .../cm/dra/plugin/plugins_store_test.go | 30 ++++++---- pkg/kubelet/cm/dra/plugin/registration.go | 21 ++++--- .../cm/dra/plugin/registration_test.go | 9 ++- .../cache/actual_state_of_world.go | 1 + pkg/kubelet/pluginmanager/cache/types.go | 2 +- .../operationexecutor/operation_generator.go | 3 +- .../pluginmanager/plugin_manager_test.go | 39 +++++++++--- .../pluginmanager/pluginwatcher/README.md | 9 +-- .../reconciler/reconciler_test.go | 2 +- pkg/volume/csi/csi_plugin.go | 4 +- 14 files changed, 128 insertions(+), 68 deletions(-) diff --git a/pkg/kubelet/cm/devicemanager/plugin/v1beta1/handler.go b/pkg/kubelet/cm/devicemanager/plugin/v1beta1/handler.go index d1bca0cccae..204afd3d1b1 100644 --- a/pkg/kubelet/cm/devicemanager/plugin/v1beta1/handler.go +++ b/pkg/kubelet/cm/devicemanager/plugin/v1beta1/handler.go @@ -43,8 +43,8 @@ func (s *server) RegisterPlugin(pluginName string, endpoint string, versions []s return s.connectClient(pluginName, endpoint) } -func (s *server) DeRegisterPlugin(pluginName string) { - klog.V(2).InfoS("Deregistering plugin", "plugin", pluginName) +func (s *server) DeRegisterPlugin(pluginName, endpoint string) { + klog.V(2).InfoS("Deregistering plugin", "plugin", pluginName, "endpoint", endpoint) client := s.getClient(pluginName) if client != nil { s.disconnectClient(pluginName, client) @@ -88,7 +88,6 @@ func (s *server) disconnectClient(name string, c Client) error { s.deregisterClient(name) return c.Disconnect() } - func (s *server) registerClient(name string, c Client) { s.mutex.Lock() defer s.mutex.Unlock() diff --git a/pkg/kubelet/cm/dra/manager_test.go b/pkg/kubelet/cm/dra/manager_test.go index 908cd735847..f65ac87575a 100644 --- a/pkg/kubelet/cm/dra/manager_test.go +++ b/pkg/kubelet/cm/dra/manager_test.go @@ -584,7 +584,7 @@ func TestPrepareResources(t *testing.T) { if err := plg.RegisterPlugin(test.driverName, draServerInfo.socketName, []string{drapb.DRAPluginService}, pluginClientTimeout); err != nil { t.Fatalf("failed to register plugin %s, err: %v", test.driverName, err) } - defer plg.DeRegisterPlugin(test.driverName) // for sake of next tests + defer plg.DeRegisterPlugin(test.driverName, draServerInfo.socketName) // for sake of next tests if test.claimInfo != nil { manager.cache.add(test.claimInfo) @@ -721,7 +721,7 @@ func TestUnprepareResources(t *testing.T) { if err := plg.RegisterPlugin(test.driverName, draServerInfo.socketName, []string{drapb.DRAPluginService}, pluginClientTimeout); err != nil { t.Fatalf("failed to register plugin %s, err: %v", test.driverName, err) } - defer plg.DeRegisterPlugin(test.driverName) // for sake of next tests + defer plg.DeRegisterPlugin(test.driverName, draServerInfo.socketName) // for sake of next tests manager := &ManagerImpl{ kubeClient: fakeKubeClient, @@ -891,7 +891,7 @@ func TestParallelPrepareUnprepareResources(t *testing.T) { if err := plg.RegisterPlugin(driverName, draServerInfo.socketName, []string{drapb.DRAPluginService}, nil); err != nil { t.Fatalf("failed to register plugin %s, err: %v", driverName, err) } - defer plg.DeRegisterPlugin(driverName) + defer plg.DeRegisterPlugin(driverName, draServerInfo.socketName) // Create ClaimInfo cache cache, err := newClaimInfoCache(t.TempDir(), draManagerStateFileName) diff --git a/pkg/kubelet/cm/dra/plugin/plugin_test.go b/pkg/kubelet/cm/dra/plugin/plugin_test.go index 396c4439388..95f10083702 100644 --- a/pkg/kubelet/cm/dra/plugin/plugin_test.go +++ b/pkg/kubelet/cm/dra/plugin/plugin_test.go @@ -136,7 +136,7 @@ func TestGRPCConnIsReused(t *testing.T) { // ensure the plugin we are using is registered draPlugins.add(p) - defer draPlugins.delete(pluginName) + defer draPlugins.remove(pluginName, addr) // we call `NodePrepareResource` 2 times and check whether a new connection is created or the same is reused for i := 0; i < 2; i++ { @@ -210,7 +210,7 @@ func TestNewDRAPluginClient(t *testing.T) { setup: func(name string) tearDown { draPlugins.add(&Plugin{name: name}) return func() { - draPlugins.delete(name) + draPlugins.remove(name, "") } }, pluginName: "dummy-plugin", @@ -298,7 +298,7 @@ func TestGRPCMethods(t *testing.T) { } draPlugins.add(p) - defer draPlugins.delete(pluginName) + defer draPlugins.remove(pluginName, addr) client, err := NewDRAPluginClient(pluginName) if err != nil { diff --git a/pkg/kubelet/cm/dra/plugin/plugins_store.go b/pkg/kubelet/cm/dra/plugin/plugins_store.go index d2f36c17849..cc2fc240b5b 100644 --- a/pkg/kubelet/cm/dra/plugin/plugins_store.go +++ b/pkg/kubelet/cm/dra/plugin/plugins_store.go @@ -18,13 +18,16 @@ package plugin import ( "errors" + "fmt" + "slices" "sync" ) // PluginsStore holds a list of DRA Plugins. type pluginsStore struct { sync.RWMutex - store map[string]*Plugin + // plugin name -> Plugin in the order in which they got added + store map[string][]*Plugin } // draPlugins map keeps track of all registered DRA plugins on the node @@ -37,43 +40,57 @@ func (s *pluginsStore) get(pluginName string) *Plugin { s.RLock() defer s.RUnlock() - return s.store[pluginName] + instances := s.store[pluginName] + if len(instances) == 0 { + return nil + } + // Heuristic: pick the most recent one. It's most likely + // the newest, except when kubelet got restarted and registered + // all running plugins in random order. + return instances[len(instances)-1] } // Set lets you save a DRA Plugin to the list and give it a specific name. // This method is protected by a mutex. -func (s *pluginsStore) add(p *Plugin) (replacedPlugin *Plugin, replaced bool) { +func (s *pluginsStore) add(p *Plugin) error { s.Lock() defer s.Unlock() if s.store == nil { - s.store = make(map[string]*Plugin) + s.store = make(map[string][]*Plugin) } - - replacedPlugin, exists := s.store[p.name] - s.store[p.name] = p - - if replacedPlugin != nil && replacedPlugin.cancel != nil { - replacedPlugin.cancel(errors.New("plugin got replaced")) + for _, oldP := range s.store[p.name] { + if oldP.endpoint == p.endpoint { + // One plugin instance cannot hijack the endpoint of another instance. + return fmt.Errorf("endpoint %s already registered for plugin %s", p.endpoint, p.name) + } } - - return replacedPlugin, exists + s.store[p.name] = append(s.store[p.name], p) + return nil } -// Delete lets you delete a DRA Plugin by name. -// This method is protected by a mutex. -func (s *pluginsStore) delete(pluginName string) *Plugin { +// remove lets you remove one endpoint for a DRA Plugin. +// This method is protected by a mutex. It returns the +// plugin if found and true if that was the last instance +func (s *pluginsStore) remove(pluginName, endpoint string) (*Plugin, bool) { s.Lock() defer s.Unlock() - p, exists := s.store[pluginName] - if !exists { - return nil + instances := s.store[pluginName] + i := slices.IndexFunc(instances, func(p *Plugin) bool { return p.endpoint == endpoint }) + if i == -1 { + return nil, false } + p := instances[i] + last := len(instances) == 1 + if last { + delete(s.store, pluginName) + } else { + s.store[pluginName] = slices.Delete(instances, i, i+1) + } + if p.cancel != nil { p.cancel(errors.New("plugin got removed")) } - delete(s.store, pluginName) - - return p + return p, last } diff --git a/pkg/kubelet/cm/dra/plugin/plugins_store_test.go b/pkg/kubelet/cm/dra/plugin/plugins_store_test.go index b4d95abf30e..2550aaa3dd1 100644 --- a/pkg/kubelet/cm/dra/plugin/plugins_store_test.go +++ b/pkg/kubelet/cm/dra/plugin/plugins_store_test.go @@ -22,6 +22,7 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestAddSameName(t *testing.T) { @@ -30,26 +31,35 @@ func TestAddSameName(t *testing.T) { firstWasCancelled := false p := &Plugin{ - name: pluginName, - cancel: func(err error) { firstWasCancelled = true }, + name: pluginName, + endpoint: "old", + cancel: func(err error) { firstWasCancelled = true }, } // ensure the plugin we are using is registered - draPlugins.add(p) - defer draPlugins.delete(p.name) + require.NoError(t, draPlugins.add(p)) + defer draPlugins.remove(p.name, p.endpoint) assert.False(t, firstWasCancelled, "should not cancel context after the first call") + // Same name, same endpoint -> error. + require.Error(t, draPlugins.add(p)) + secondWasCancelled := false p2 := &Plugin{ - name: pluginName, - cancel: func(err error) { secondWasCancelled = true }, + name: pluginName, + endpoint: "new", + cancel: func(err error) { secondWasCancelled = true }, } + require.NoError(t, draPlugins.add(p2)) + defer draPlugins.remove(p2.name, p2.endpoint) - draPlugins.add(p2) - defer draPlugins.delete(p2.name) + assert.False(t, firstWasCancelled, "should not cancel context after registering the second instance") + assert.False(t, secondWasCancelled, "should not cancel context of a new plugin") - assert.True(t, firstWasCancelled, "should cancel context after the second call") + // Remove old plugin. + draPlugins.remove(p.name, p.endpoint) + assert.True(t, firstWasCancelled, "should have canceled context after the explicit removal") assert.False(t, secondWasCancelled, "should not cancel context of a new plugin") } @@ -65,7 +75,7 @@ func TestDelete(t *testing.T) { // ensure the plugin we are using is registered draPlugins.add(p) - draPlugins.delete(p.name) + draPlugins.remove(p.name, "") assert.True(t, wasCancelled, "should cancel context after the second call") } diff --git a/pkg/kubelet/cm/dra/plugin/registration.go b/pkg/kubelet/cm/dra/plugin/registration.go index 2af7db5bc3b..1d95e582e0a 100644 --- a/pkg/kubelet/cm/dra/plugin/registration.go +++ b/pkg/kubelet/cm/dra/plugin/registration.go @@ -178,10 +178,10 @@ func (h *RegistrationHandler) RegisterPlugin(pluginName string, endpoint string, // into all log output related to the plugin. ctx := h.backgroundCtx logger := klog.FromContext(ctx) - logger = klog.LoggerWithValues(logger, "pluginName", pluginName) + logger = klog.LoggerWithValues(logger, "pluginName", pluginName, "endpoint", endpoint) ctx = klog.NewContext(ctx, logger) - logger.V(3).Info("Register new DRA plugin", "endpoint", endpoint) + logger.V(3).Info("Register new DRA plugin") chosenService, err := h.validateSupportedServices(pluginName, supportedServices) if err != nil { @@ -209,9 +209,10 @@ func (h *RegistrationHandler) RegisterPlugin(pluginName string, endpoint string, // Storing endpoint of newly registered DRA Plugin into the map, where plugin name will be the key // all other DRA components will be able to get the actual socket of DRA plugins by its name. - - if oldPlugin, replaced := draPlugins.add(pluginInstance); replaced { - logger.V(1).Info("DRA plugin already registered, the old plugin was replaced and will be forgotten by the kubelet till the next kubelet restart", "oldEndpoint", oldPlugin.endpoint) + if err := draPlugins.add(pluginInstance); err != nil { + cancel(err) + // No wrapping, the error already contains details. + return err } // Now cancel any pending ResourceSlice wiping for this plugin. @@ -259,10 +260,14 @@ func (h *RegistrationHandler) validateSupportedServices(pluginName string, suppo // DeRegisterPlugin is called when a plugin has removed its socket, // signaling it is no longer available. -func (h *RegistrationHandler) DeRegisterPlugin(pluginName string) { - if p := draPlugins.delete(pluginName); p != nil { +func (h *RegistrationHandler) DeRegisterPlugin(pluginName, endpoint string) { + if p, last := draPlugins.remove(pluginName, endpoint); p != nil { + // This logger includes endpoint and pluginName. logger := klog.FromContext(p.backgroundCtx) - logger.V(3).Info("Deregister DRA plugin", "endpoint", p.endpoint) + logger.V(3).Info("Deregister DRA plugin", "lastInstance", last) + if !last { + return + } // Prepare for canceling the background wiping. This needs to run // in the context of the registration handler, the one from diff --git a/pkg/kubelet/cm/dra/plugin/registration_test.go b/pkg/kubelet/cm/dra/plugin/registration_test.go index 4920fdf34ec..59f3099ab65 100644 --- a/pkg/kubelet/cm/dra/plugin/registration_test.go +++ b/pkg/kubelet/cm/dra/plugin/registration_test.go @@ -176,6 +176,10 @@ func TestRegistrationHandler(t *testing.T) { // Simulate one existing plugin A. err := handler.RegisterPlugin(pluginA, endpointA, []string{drapb.DRAPluginService}, nil) require.NoError(t, err) + t.Cleanup(func() { + tCtx.Logf("Removing plugin %s", pluginA) + handler.DeRegisterPlugin(pluginA, endpointA) + }) err = handler.ValidatePlugin(test.pluginName, test.endpoint, test.supportedServices) if test.shouldError { @@ -206,9 +210,10 @@ func TestRegistrationHandler(t *testing.T) { assert.NoError(t, err, "recreate slice") } - handler.DeRegisterPlugin(test.pluginName) + tCtx.Logf("Removing plugin %s", test.pluginName) + handler.DeRegisterPlugin(test.pluginName, test.endpoint) // Nop. - handler.DeRegisterPlugin(test.pluginName) + handler.DeRegisterPlugin(test.pluginName, test.endpoint) requireNoSlices() }) diff --git a/pkg/kubelet/pluginmanager/cache/actual_state_of_world.go b/pkg/kubelet/pluginmanager/cache/actual_state_of_world.go index 50b5658af04..cdaa42c0a3a 100644 --- a/pkg/kubelet/pluginmanager/cache/actual_state_of_world.go +++ b/pkg/kubelet/pluginmanager/cache/actual_state_of_world.go @@ -89,6 +89,7 @@ type PluginInfo struct { UUID types.UID Handler PluginHandler Name string + Endpoint string } func (asw *actualStateOfWorld) AddPlugin(pluginInfo PluginInfo) error { diff --git a/pkg/kubelet/pluginmanager/cache/types.go b/pkg/kubelet/pluginmanager/cache/types.go index 0656bc90646..bcffd117ef2 100644 --- a/pkg/kubelet/pluginmanager/cache/types.go +++ b/pkg/kubelet/pluginmanager/cache/types.go @@ -56,5 +56,5 @@ type PluginHandler interface { RegisterPlugin(pluginName, endpoint string, versions []string, pluginClientTimeout *time.Duration) error // DeRegisterPlugin is called once the pluginwatcher observes that the socket has // been deleted. - DeRegisterPlugin(pluginName string) + DeRegisterPlugin(pluginName, endpoint string) } diff --git a/pkg/kubelet/pluginmanager/operationexecutor/operation_generator.go b/pkg/kubelet/pluginmanager/operationexecutor/operation_generator.go index 310961c2e64..7b6fbba5b50 100644 --- a/pkg/kubelet/pluginmanager/operationexecutor/operation_generator.go +++ b/pkg/kubelet/pluginmanager/operationexecutor/operation_generator.go @@ -118,6 +118,7 @@ func (og *operationGenerator) GenerateRegisterPluginFunc( UUID: pluginUUID, Handler: handler, Name: infoResp.Name, + Endpoint: infoResp.Endpoint, }) if err != nil { klog.ErrorS(err, "RegisterPlugin error -- failed to add plugin", "path", socketPath) @@ -147,7 +148,7 @@ func (og *operationGenerator) GenerateUnregisterPluginFunc( // so that if we receive a register event during Register Plugin, we can process it as a Register call. actualStateOfWorldUpdater.RemovePlugin(pluginInfo.SocketPath) - pluginInfo.Handler.DeRegisterPlugin(pluginInfo.Name) + pluginInfo.Handler.DeRegisterPlugin(pluginInfo.Name, pluginInfo.Endpoint) klog.V(4).InfoS("DeRegisterPlugin called", "pluginName", pluginInfo.Name, "pluginHandler", pluginInfo.Handler) return nil diff --git a/pkg/kubelet/pluginmanager/plugin_manager_test.go b/pkg/kubelet/pluginmanager/plugin_manager_test.go index 407f821977e..99aa22ad7ae 100644 --- a/pkg/kubelet/pluginmanager/plugin_manager_test.go +++ b/pkg/kubelet/pluginmanager/plugin_manager_test.go @@ -54,7 +54,7 @@ func newFakePluginHandler() *fakePluginHandler { func (f *fakePluginHandler) ValidatePlugin(pluginName string, endpoint string, versions []string) error { f.Lock() defer f.Unlock() - f.events = append(f.events, "validate "+pluginName) + f.events = append(f.events, "validate "+pluginName+" "+endpoint) return nil } @@ -62,15 +62,15 @@ func (f *fakePluginHandler) ValidatePlugin(pluginName string, endpoint string, v func (f *fakePluginHandler) RegisterPlugin(pluginName, endpoint string, versions []string, pluginClientTimeout *time.Duration) error { f.Lock() defer f.Unlock() - f.events = append(f.events, "register "+pluginName) + f.events = append(f.events, "register "+pluginName+" "+endpoint) return nil } // DeRegisterPlugin is a fake method -func (f *fakePluginHandler) DeRegisterPlugin(pluginName string) { +func (f *fakePluginHandler) DeRegisterPlugin(pluginName, endpoint string) { f.Lock() defer f.Unlock() - f.events = append(f.events, "deregister "+pluginName) + f.events = append(f.events, "deregister "+pluginName+" "+endpoint) } func (f *fakePluginHandler) Reset() { @@ -93,8 +93,24 @@ func cleanup(t *testing.T) { os.MkdirAll(socketDir, 0755) } -func waitForRegistration(t *testing.T, fakePluginHandler *fakePluginHandler, pluginName string) { - expected := []string{"validate " + pluginName, "register " + pluginName} +func waitForRegistration(t *testing.T, fakePluginHandler *fakePluginHandler, pluginName, endpoint string) { + t.Helper() + waitFor(t, fakePluginHandler, + []string{"validate " + pluginName + " " + endpoint, "register " + pluginName + " " + endpoint}, + "Timed out waiting for plugin to be added to actual state of world cache.", + ) +} + +func waitForDeRegistration(t *testing.T, fakePluginHandler *fakePluginHandler, pluginName, endpoint string) { + t.Helper() + waitFor(t, fakePluginHandler, + []string{"deregister " + pluginName + " " + endpoint}, + "Timed out waiting for plugin to be removed from actual state of the world cache.", + ) +} + +func waitFor(t *testing.T, fakePluginHandler *fakePluginHandler, expected []string, what string) { + t.Helper() err := retryWithExponentialBackOff( 100*time.Millisecond, func() (bool, error) { @@ -108,7 +124,7 @@ func waitForRegistration(t *testing.T, fakePluginHandler *fakePluginHandler, plu }, ) if err != nil { - t.Fatalf("Timed out waiting for plugin to be added to actual state of world cache.") + t.Fatal(what) } } @@ -122,7 +138,7 @@ func retryWithExponentialBackOff(initialDuration time.Duration, fn wait.Conditio return wait.ExponentialBackoff(backoff, fn) } -func TestPluginRegistration(t *testing.T) { +func TestPluginManager(t *testing.T) { defer cleanup(t) pluginManager := newTestPluginManager(socketDir) @@ -157,7 +173,12 @@ func TestPluginRegistration(t *testing.T) { require.NoError(t, p.Serve("v1beta1", "v1beta2")) // Verify that the plugin is registered - waitForRegistration(t, fakeHandler, pluginName) + waitForRegistration(t, fakeHandler, pluginName, socketPath) + + // And unregister. + fakeHandler.Reset() + require.NoError(t, p.Stop()) + waitForDeRegistration(t, fakeHandler, pluginName, socketPath) } } diff --git a/pkg/kubelet/pluginmanager/pluginwatcher/README.md b/pkg/kubelet/pluginmanager/pluginwatcher/README.md index 5de09c14f9c..87280fa93c7 100644 --- a/pkg/kubelet/pluginmanager/pluginwatcher/README.md +++ b/pkg/kubelet/pluginmanager/pluginwatcher/README.md @@ -28,8 +28,7 @@ To avoid downtime of a plugin on a node, it would be nice to support running an old plugin in parallel to the new plugin. When deploying with a DaemonSet, setting `maxSurge` to a value larger than zero enables such a seamless upgrade. -**Warning**: Such a seamless upgrade **is not** supported at the moment. This -section merely describes what would have to be changed to make it work. +**Warning**: Such a seamless upgrade is only supported for DRA at the moment. ### In a plugin @@ -65,7 +64,7 @@ isn't perfect because after a kubelet restart, plugin instances get registered in a random order. Restarting the kubelet in the middle of an upgrade should be rare. -At the moment, none of the existing handlers support such seamless upgrades: +At the moment, the following handlers do not support such seamless upgrades: - The device plugin handler suffers from temporarily removing the extended resources during an upgrade. A proposed fix is pending in @@ -78,7 +77,9 @@ At the moment, none of the existing handlers support such seamless upgrades: from the csi-node-registrar as supported version, so this version selection mechanism isn't used at all. -- The DRA handler only remembers the most recently registered instance. +This supports it: + +- DRA ### Deployment diff --git a/pkg/kubelet/pluginmanager/reconciler/reconciler_test.go b/pkg/kubelet/pluginmanager/reconciler/reconciler_test.go index 41d8b46f4cd..68e72ddcaae 100644 --- a/pkg/kubelet/pluginmanager/reconciler/reconciler_test.go +++ b/pkg/kubelet/pluginmanager/reconciler/reconciler_test.go @@ -132,7 +132,7 @@ func (d *DummyImpl) RegisterPlugin(pluginName string, endpoint string, versions } // DeRegisterPlugin is a dummy implementation -func (d *DummyImpl) DeRegisterPlugin(pluginName string) { +func (d *DummyImpl) DeRegisterPlugin(pluginName, endpoint string) { } // Calls Run() diff --git a/pkg/volume/csi/csi_plugin.go b/pkg/volume/csi/csi_plugin.go index 59fa6245b7f..ceefa3ff7c1 100644 --- a/pkg/volume/csi/csi_plugin.go +++ b/pkg/volume/csi/csi_plugin.go @@ -187,8 +187,8 @@ func (h *RegistrationHandler) validateVersions(callerName, pluginName string, en // DeRegisterPlugin is called when a plugin removed its socket, signaling // it is no longer available -func (h *RegistrationHandler) DeRegisterPlugin(pluginName string) { - klog.Info(log("registrationHandler.DeRegisterPlugin request for plugin %s", pluginName)) +func (h *RegistrationHandler) DeRegisterPlugin(pluginName, endpoint string) { + klog.Info(log("registrationHandler.DeRegisterPlugin request for plugin %s, endpoint %s", pluginName, endpoint)) if err := unregisterDriver(pluginName); err != nil { klog.Error(log("registrationHandler.DeRegisterPlugin failed: %v", err)) } From 582b421393d0fad2ad4a83feba88977ac4434662 Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Thu, 13 Mar 2025 12:03:10 +0100 Subject: [PATCH 4/4] DRA kubeletplugin: add RollingUpdate When the new RollingUpdate option is used, the DRA driver gets deployed such that it uses unique socket paths and uses file locking to serialize gRPC calls. This enables the kubelet to pick arbitrarily between two concurrently instances. The handover is seamless (no downtime, no removal of ResourceSlices by the kubelet). For file locking, the fileutils package from etcd is used because that was already a Kubernetes dependency. Unfortunately that package brings in some additional indirect dependency for DRA drivers (zap, multierr), but those seem acceptable. --- .../pluginmanager/pluginwatcher/README.md | 5 + .../k8s.io/dynamic-resource-allocation/go.mod | 3 + .../k8s.io/dynamic-resource-allocation/go.sum | 4 + .../kubeletplugin/draplugin.go | 131 +++++++++++++++--- test/e2e/dra/deploy.go | 27 +++- test/e2e/dra/dra.go | 87 +++++++++++- test/e2e/dra/test-driver/app/kubeletplugin.go | 1 - 7 files changed, 235 insertions(+), 23 deletions(-) diff --git a/pkg/kubelet/pluginmanager/pluginwatcher/README.md b/pkg/kubelet/pluginmanager/pluginwatcher/README.md index 87280fa93c7..9403829a2fb 100644 --- a/pkg/kubelet/pluginmanager/pluginwatcher/README.md +++ b/pkg/kubelet/pluginmanager/pluginwatcher/README.md @@ -32,6 +32,11 @@ setting `maxSurge` to a value larger than zero enables such a seamless upgrade. ### In a plugin +*Note*: For DRA, the +[k8s.io/dynamic-resource-allocation](https://pkg.go.dev/k8s.io/dynamic-resource-allocation/kubeletplugin) +helper package offers the `RollingUpdate` option which implements the socket +handling as described in this section. + To support seamless upgrades, each plugin instance must use a unique socket filename. Otherwise the following could happen: - The old instance is registered with `plugin.example.com-reg.sock`. diff --git a/staging/src/k8s.io/dynamic-resource-allocation/go.mod b/staging/src/k8s.io/dynamic-resource-allocation/go.mod index b152fb0bbfb..451a9332d4a 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/go.mod +++ b/staging/src/k8s.io/dynamic-resource-allocation/go.mod @@ -12,6 +12,7 @@ require ( github.com/google/go-cmp v0.7.0 github.com/onsi/gomega v1.35.1 github.com/stretchr/testify v1.10.0 + go.etcd.io/etcd/client/pkg/v3 v3.5.16 google.golang.org/grpc v1.68.1 k8s.io/api v0.0.0 k8s.io/apimachinery v0.0.0 @@ -57,6 +58,8 @@ require ( github.com/x448/float16 v0.8.4 // indirect go.opentelemetry.io/otel v1.33.0 // indirect go.opentelemetry.io/otel/trace v1.33.0 // indirect + go.uber.org/multierr v1.11.0 // indirect + go.uber.org/zap v1.27.0 // indirect golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 // indirect golang.org/x/net v0.33.0 // indirect golang.org/x/oauth2 v0.27.0 // indirect diff --git a/staging/src/k8s.io/dynamic-resource-allocation/go.sum b/staging/src/k8s.io/dynamic-resource-allocation/go.sum index 991e9276662..aae134cac79 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/go.sum +++ b/staging/src/k8s.io/dynamic-resource-allocation/go.sum @@ -158,6 +158,7 @@ github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= go.etcd.io/bbolt v1.3.11/go.mod h1:dksAq7YMXoljX0xu6VF5DMZGbhYYoLUalEiSySYAS4I= go.etcd.io/etcd/api/v3 v3.5.16/go.mod h1:1P4SlIP/VwkDmGo3OlOD7faPeP8KDIFhqvciH5EfN28= +go.etcd.io/etcd/client/pkg/v3 v3.5.16 h1:ZgY48uH6UvB+/7R9Yf4x574uCO3jIx0TRDyetSfId3Q= go.etcd.io/etcd/client/pkg/v3 v3.5.16/go.mod h1:V8acl8pcEK0Y2g19YlOV9m9ssUe6MgiDSobSoaBAM0E= go.etcd.io/etcd/client/v2 v2.305.16/go.mod h1:h9YxWCzcdvZENbfzBTFCnoNumr2ax3F19sKMqHFmXHE= go.etcd.io/etcd/client/v3 v3.5.16/go.mod h1:X+rExSGkyqxvu276cr2OwPLBaeqFu1cIl4vmRjAD/50= @@ -176,9 +177,12 @@ go.opentelemetry.io/otel/sdk v1.33.0/go.mod h1:A1Q5oi7/9XaMlIWzPSxLRWOI8nG3FnzHJ go.opentelemetry.io/otel/trace v1.33.0 h1:cCJuF7LRjUFso9LPnEAHJDB2pqzp+hbO8eu1qqW2d/s= go.opentelemetry.io/otel/trace v1.33.0/go.mod h1:uIcdVUZMpTAmz0tI1z04GoVSezK37CbGV4fr1f2nBck= go.opentelemetry.io/proto/otlp v1.4.0/go.mod h1:PPBWZIP98o2ElSqI35IHfu7hIhSwvc5N38Jw8pXuGFY= +go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= +go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= +go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= diff --git a/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/draplugin.go b/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/draplugin.go index 57602cdaee0..92f1af253e1 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/draplugin.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/draplugin.go @@ -21,12 +21,14 @@ import ( "errors" "fmt" "net" + "os" "path" "sync" "google.golang.org/grpc" "k8s.io/klog/v2" + "go.etcd.io/etcd/client/pkg/v3/fileutil" resourceapi "k8s.io/api/resource/v1beta1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" @@ -195,8 +197,10 @@ func RegistrarDirectoryPath(path string) Option { // support updates from an installation which used an older release of // of the helper code. // -// The default is -reg.sock. When rolling updates are enabled (not supported yet), +// The default is -reg.sock. When rolling updates are enabled, // it is --reg.sock. +// +// This option and [RollingUpdate] are mutually exclusive. func RegistrarSocketFilename(name string) Option { return func(o *options) error { o.pluginRegistrationEndpoint.file = name @@ -248,6 +252,44 @@ func PluginListener(listen func(ctx context.Context, path string) (net.Listener, } } +// RollingUpdate can be used to enable support for running two plugin instances +// in parallel while a newer instance replaces the older. When enabled, both +// instances must share the same plugin data directory and driver name. +// They create different sockets to allow the kubelet to connect to both at +// the same time. +// +// There is no guarantee which of the two instances are used by kubelet. +// For example, it can happen that a claim gets prepared by one instance +// and then needs to be unprepared by the other. Kubelet then may fall back +// to the first one again for some other operation. In practice this means +// that each instance must be entirely stateless across method calls. +// Serialization (on by default, see [Serialize]) ensures that methods +// are serialized across all instances through file locking. The plugin +// implementation can load shared state from a file at the start +// of a call, execute and then store the updated shared state again. +// +// Passing a non-empty uid enables rolling updates, an empty uid disables it. +// The uid must be the pod UID. A DaemonSet can pass that into the driver container +// via the downward API (https://kubernetes.io/docs/concepts/workloads/pods/downward-api/#downwardapi-fieldRef). +// +// Because new instances cannot remove stale sockets of older instances, +// it is important that each pod shuts down cleanly: it must catch SIGINT/TERM +// and stop the helper instead of quitting immediately. +// +// This depends on support in the kubelet which was added in Kubernetes 1.33. +// Don't use this if it is not certain that the kubelet has that support! +// +// This option and [RegistrarSocketFilename] are mutually exclusive. +func RollingUpdate(uid types.UID) Option { + return func(o *options) error { + o.rollingUpdateUID = uid + + // TODO: ask the kubelet whether that pod is still running and + // clean up leftover sockets? + return nil + } +} + // GRPCInterceptor is called for each incoming gRPC method call. This option // may be used more than once and each interceptor will get called. func GRPCInterceptor(interceptor grpc.UnaryServerInterceptor) Option { @@ -322,6 +364,17 @@ func Serialize(enabled bool) Option { } } +// FlockDir changes where lock files are created and locked. A lock file +// is needed when serializing gRPC calls and rolling updates are enabled. +// The directory must exist and be reserved for exclusive use by the +// driver. The default is the plugin data directory. +func FlockDirectoryPath(path string) Option { + return func(o *options) error { + o.flockDirectoryPath = path + return nil + } +} + type options struct { logger klog.Logger grpcVerbosity int @@ -330,11 +383,13 @@ type options struct { nodeUID types.UID pluginRegistrationEndpoint endpoint pluginDataDirectoryPath string + rollingUpdateUID types.UID draEndpointListen func(ctx context.Context, path string) (net.Listener, error) unaryInterceptors []grpc.UnaryServerInterceptor streamInterceptors []grpc.StreamServerInterceptor kubeClient kubernetes.Interface serialize bool + flockDirectoryPath string nodeV1beta1 bool } @@ -344,17 +399,18 @@ type Helper struct { // backgroundCtx is for activities that are started later. backgroundCtx context.Context // cancel cancels the backgroundCtx. - cancel func(cause error) - wg sync.WaitGroup - registrar *nodeRegistrar - pluginServer *grpcServer - plugin DRAPlugin - driverName string - nodeName string - nodeUID types.UID - kubeClient kubernetes.Interface - serialize bool - grpcMutex sync.Mutex + cancel func(cause error) + wg sync.WaitGroup + registrar *nodeRegistrar + pluginServer *grpcServer + plugin DRAPlugin + driverName string + nodeName string + nodeUID types.UID + kubeClient kubernetes.Interface + serialize bool + grpcMutex sync.Mutex + grpcLockFilePath string // Information about resource publishing changes concurrently and thus // must be protected by the mutex. The controller gets started only @@ -392,12 +448,20 @@ func Start(ctx context.Context, plugin DRAPlugin, opts ...Option) (result *Helpe if o.driverName == "" { return nil, errors.New("driver name must be set") } + if o.rollingUpdateUID != "" && o.pluginRegistrationEndpoint.file != "" { + return nil, errors.New("rolling updates and explicit registration socket filename are mutually exclusive") + } + uidPart := "" + if o.rollingUpdateUID != "" { + uidPart = "-" + string(o.rollingUpdateUID) + } if o.pluginRegistrationEndpoint.file == "" { - o.pluginRegistrationEndpoint.file = o.driverName + "-reg.sock" + o.pluginRegistrationEndpoint.file = o.driverName + uidPart + "-reg.sock" } if o.pluginDataDirectoryPath == "" { o.pluginDataDirectoryPath = path.Join(KubeletPluginsDir, o.driverName) } + d := &Helper{ driverName: o.driverName, nodeName: o.nodeName, @@ -406,6 +470,14 @@ func Start(ctx context.Context, plugin DRAPlugin, opts ...Option) (result *Helpe serialize: o.serialize, plugin: plugin, } + if o.rollingUpdateUID != "" { + dir := o.pluginDataDirectoryPath + if o.flockDirectoryPath != "" { + dir = o.flockDirectoryPath + } + // Enable file locking, required for concurrently running pods. + d.grpcLockFilePath = path.Join(dir, "serialize.lock") + } // Stop calls cancel and therefore both cancellation // and Stop cause goroutines to stop. @@ -434,7 +506,7 @@ func Start(ctx context.Context, plugin DRAPlugin, opts ...Option) (result *Helpe var supportedServices []string draEndpoint := endpoint{ dir: o.pluginDataDirectoryPath, - file: "dra.sock", // "dra" is hard-coded. + file: "dra" + uidPart + ".sock", // "dra" is hard-coded. The directory is unique, so we get a unique full path also without the UID. listenFunc: o.draEndpointListen, } pluginServer, err := startGRPCServer(klog.LoggerWithName(logger, "dra"), o.grpcVerbosity, o.unaryInterceptors, o.streamInterceptors, draEndpoint, func(grpcServer *grpc.Server) { @@ -575,12 +647,25 @@ func (d *Helper) RegistrationStatus() *registerapi.RegistrationStatus { // serializeGRPCIfEnabled locks a mutex if serialization is enabled. // Either way it returns a method that the caller must invoke // via defer. -func (d *Helper) serializeGRPCIfEnabled() func() { +func (d *Helper) serializeGRPCIfEnabled() (func(), error) { if !d.serialize { - return func() {} + return func() {}, nil } + + // If rolling updates are enabled, we cannot do only in-memory locking. + // We must use file locking. + if d.grpcLockFilePath != "" { + file, err := fileutil.LockFile(d.grpcLockFilePath, os.O_RDWR|os.O_CREATE, 0666) + if err != nil { + return nil, fmt.Errorf("lock file: %w", err) + } + return func() { + _ = file.Close() + }, nil + } + d.grpcMutex.Lock() - return d.grpcMutex.Unlock + return d.grpcMutex.Unlock, nil } // nodePluginImplementation is a thin wrapper around the helper instance. @@ -597,7 +682,11 @@ func (d *nodePluginImplementation) NodePrepareResources(ctx context.Context, req return nil, fmt.Errorf("get resource claims: %w", err) } - defer d.serializeGRPCIfEnabled()() + unlock, err := d.serializeGRPCIfEnabled() + if err != nil { + return nil, fmt.Errorf("serialize gRPC: %w", err) + } + defer unlock() result, err := d.plugin.PrepareResourceClaims(ctx, claims) if err != nil { @@ -659,7 +748,11 @@ func (d *nodePluginImplementation) getResourceClaims(ctx context.Context, claims // NodeUnprepareResources implements [draapi.NodeUnprepareResources]. func (d *nodePluginImplementation) NodeUnprepareResources(ctx context.Context, req *drapb.NodeUnprepareResourcesRequest) (*drapb.NodeUnprepareResourcesResponse, error) { - defer d.serializeGRPCIfEnabled() + unlock, err := d.serializeGRPCIfEnabled() + if err != nil { + return nil, fmt.Errorf("serialize gRPC: %w", err) + } + defer unlock() claims := make([]NamespacedObject, 0, len(req.Claims)) for _, claim := range req.Claims { diff --git a/test/e2e/dra/deploy.go b/test/e2e/dra/deploy.go index a451be4af68..ba6ef51fa3b 100644 --- a/test/e2e/dra/deploy.go +++ b/test/e2e/dra/deploy.go @@ -77,6 +77,7 @@ const ( type Nodes struct { NodeNames []string + tempDir string } type Resources struct { @@ -112,6 +113,8 @@ func NewNodesNow(ctx context.Context, f *framework.Framework, minNodes, maxNodes } func (nodes *Nodes) init(ctx context.Context, f *framework.Framework, minNodes, maxNodes int) { + nodes.tempDir = ginkgo.GinkgoT().TempDir() + ginkgo.By("selecting nodes") // The kubelet plugin is harder. We deploy the builtin manifest // after patching in the driver name and all nodes on which we @@ -219,6 +222,12 @@ func (d *Driver) Run(nodes *Nodes, configureResources func() Resources, devicesP ginkgo.DeferCleanup(d.TearDown) } +// NewGetSlices generates a function for gomega.Eventually/Consistently which +// returns the ResourceSliceList. +func (d *Driver) NewGetSlices() framework.GetFunc[*resourceapi.ResourceSliceList] { + return framework.ListObjects(d.f.ClientSet.ResourceV1beta1().ResourceSlices().List, metav1.ListOptions{FieldSelector: resourceapi.ResourceSliceSelectorDriver + "=" + d.Name}) +} + type MethodInstance struct { Nodename string FullMethod string @@ -240,6 +249,10 @@ type Driver struct { // The socket path is still the same. InstanceSuffix string + // RollingUpdate can be set to true to enable using different socket names + // for different pods and thus seamless upgrades. Must be supported by the kubelet! + RollingUpdate bool + // Name gets derived automatically from the current test namespace and // (if set) the NameSuffix while setting up the driver for a test. Name string @@ -351,7 +364,6 @@ func (d *Driver) SetUp(nodes *Nodes, resources Resources, devicesPerNode ...map[ numNodes := int32(len(nodes.NodeNames)) pluginDataDirectoryPath := path.Join(framework.TestContext.KubeletRootDir, "plugins", d.Name) registrarDirectoryPath := path.Join(framework.TestContext.KubeletRootDir, "plugins_registry") - registrarSocketFilename := d.Name + "-reg.sock" instanceName := d.Name + d.InstanceSuffix err := utils.CreateFromManifests(ctx, d.f, d.f.Namespace, func(item interface{}) error { switch item := item.(type) { @@ -447,6 +459,14 @@ func (d *Driver) SetUp(nodes *Nodes, resources Resources, devicesPerNode ...map[ // All listeners running in this pod use a new unique local port number // by atomically incrementing this variable. listenerPort := int32(9000) + rollingUpdateUID := pod.UID + serialize := true + if !d.RollingUpdate { + rollingUpdateUID = "" + // A test might have to execute two gRPC calls in parallel, so only + // serialize when we explicitly want to test a rolling update. + serialize = false + } plugin, err := app.StartPlugin(loggerCtx, "/cdi", d.Name, driverClient, nodename, fileOps, kubeletplugin.GRPCVerbosity(0), kubeletplugin.GRPCInterceptor(func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { @@ -456,11 +476,14 @@ func (d *Driver) SetUp(nodes *Nodes, resources Resources, devicesPerNode ...map[ return d.streamInterceptor(nodename, srv, ss, info, handler) }), + kubeletplugin.RollingUpdate(rollingUpdateUID), + kubeletplugin.Serialize(serialize), + kubeletplugin.FlockDirectoryPath(nodes.tempDir), + kubeletplugin.PluginDataDirectoryPath(pluginDataDirectoryPath), kubeletplugin.PluginListener(listen(d.f, &pod, &listenerPort)), kubeletplugin.RegistrarDirectoryPath(registrarDirectoryPath), - kubeletplugin.RegistrarSocketFilename(registrarSocketFilename), kubeletplugin.RegistrarListener(listen(d.f, &pod, &listenerPort)), ) framework.ExpectNoError(err, "start kubelet plugin for node %s", pod.Spec.NodeName) diff --git a/test/e2e/dra/dra.go b/test/e2e/dra/dra.go index 80fd5806cb2..091cfdd6fd7 100644 --- a/test/e2e/dra/dra.go +++ b/test/e2e/dra/dra.go @@ -1802,6 +1802,91 @@ var _ = framework.SIGDescribe("node")("DRA", feature.DynamicResourceAllocation, framework.ExpectNoError(e2epod.WaitForPodNotFoundInNamespace(ctx, f.ClientSet, pod.Name, pod.Namespace, f.Timeouts.PodDelete)) }) + ginkgo.It("rolling update", func(ctx context.Context) { + nodes := NewNodesNow(ctx, f, 1, 1) + + oldDriver := NewDriverInstance(f) + oldDriver.InstanceSuffix = "-old" + oldDriver.RollingUpdate = true + oldDriver.Run(nodes, perNode(1, nodes)) + + // We expect one ResourceSlice per node from the driver. + getSlices := oldDriver.NewGetSlices() + gomega.Eventually(ctx, getSlices).Should(gomega.HaveField("Items", gomega.HaveLen(len(nodes.NodeNames)))) + initialSlices, err := getSlices(ctx) + framework.ExpectNoError(err) + + // Same driver name, different socket paths because of rolling update. + newDriver := NewDriverInstance(f) + newDriver.InstanceSuffix = "-new" + newDriver.RollingUpdate = true + newDriver.Run(nodes, perNode(1, nodes)) + + // Stop old driver instance. + oldDriver.TearDown(ctx) + + // Build behaves the same for both driver instances. + b := newBuilderNow(ctx, f, oldDriver) + claim := b.externalClaim() + pod := b.podExternal() + b.create(ctx, claim, pod) + b.testPod(ctx, f, pod) + + // The exact same slices should still exist. + finalSlices, err := getSlices(ctx) + framework.ExpectNoError(err) + gomega.Expect(finalSlices.Items).Should(gomega.Equal(initialSlices.Items)) + + // We need to clean up explicitly because the normal + // cleanup doesn't work (driver shuts down first). + // framework.ExpectNoError(f.ClientSet.ResourceV1beta1().ResourceClaims(claim.Namespace).Delete(ctx, claim.Name, metav1.DeleteOptions{})) + framework.ExpectNoError(f.ClientSet.CoreV1().Pods(pod.Namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{})) + framework.ExpectNoError(e2epod.WaitForPodNotFoundInNamespace(ctx, f.ClientSet, pod.Name, pod.Namespace, f.Timeouts.PodDelete)) + }) + + ginkgo.It("failed update", func(ctx context.Context) { + nodes := NewNodesNow(ctx, f, 1, 1) + + oldDriver := NewDriverInstance(f) + oldDriver.InstanceSuffix = "-old" + oldDriver.RollingUpdate = true + oldDriver.Run(nodes, perNode(1, nodes)) + + // We expect one ResourceSlice per node from the driver. + getSlices := oldDriver.NewGetSlices() + gomega.Eventually(ctx, getSlices).Should(gomega.HaveField("Items", gomega.HaveLen(len(nodes.NodeNames)))) + initialSlices, err := getSlices(ctx) + framework.ExpectNoError(err) + + // Same driver name, different socket paths because of rolling update. + newDriver := NewDriverInstance(f) + newDriver.InstanceSuffix = "-new" + newDriver.RollingUpdate = true + newDriver.Run(nodes, perNode(1, nodes)) + + // Stop new driver instance, simulating the failure of the new instance. + // The kubelet should still have the old instance. + newDriver.TearDown(ctx) + + // Build behaves the same for both driver instances. + b := newBuilderNow(ctx, f, oldDriver) + claim := b.externalClaim() + pod := b.podExternal() + b.create(ctx, claim, pod) + b.testPod(ctx, f, pod) + + // The exact same slices should still exist. + finalSlices, err := getSlices(ctx) + framework.ExpectNoError(err) + gomega.Expect(finalSlices.Items).Should(gomega.Equal(initialSlices.Items)) + + // We need to clean up explicitly because the normal + // cleanup doesn't work (driver shuts down first). + // framework.ExpectNoError(f.ClientSet.ResourceV1beta1().ResourceClaims(claim.Namespace).Delete(ctx, claim.Name, metav1.DeleteOptions{})) + framework.ExpectNoError(f.ClientSet.CoreV1().Pods(pod.Namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{})) + framework.ExpectNoError(e2epod.WaitForPodNotFoundInNamespace(ctx, f.ClientSet, pod.Name, pod.Namespace, f.Timeouts.PodDelete)) + }) + f.It("sequential update with pods replacing each other", framework.WithSlow(), func(ctx context.Context) { nodes := NewNodesNow(ctx, f, 1, 1) @@ -1823,7 +1908,7 @@ var _ = framework.SIGDescribe("node")("DRA", feature.DynamicResourceAllocation, // "Update" the driver by taking it down and bringing up a new one. // Pods never run in parallel, similar to how a DaemonSet would update - // its pods. + // its pods when maxSurge is zero. ginkgo.By("reinstall driver") start := time.Now() oldDriver.TearDown(ctx) diff --git a/test/e2e/dra/test-driver/app/kubeletplugin.go b/test/e2e/dra/test-driver/app/kubeletplugin.go index 498e02fc715..875fdf886d8 100644 --- a/test/e2e/dra/test-driver/app/kubeletplugin.go +++ b/test/e2e/dra/test-driver/app/kubeletplugin.go @@ -168,7 +168,6 @@ func StartPlugin(ctx context.Context, cdiDir, driverName string, kubeClient kube kubeletplugin.KubeClient(kubeClient), kubeletplugin.GRPCInterceptor(ex.recordGRPCCall), kubeletplugin.GRPCStreamInterceptor(ex.recordGRPCStream), - kubeletplugin.Serialize(false), // The example plugin does its own locking. ) d, err := kubeletplugin.Start(ctx, ex, opts...) if err != nil {