diff --git a/pkg/kubelet/cm/dra/manager.go b/pkg/kubelet/cm/dra/manager.go index 8e965c6b694..26043a0fd28 100644 --- a/pkg/kubelet/cm/dra/manager.go +++ b/pkg/kubelet/cm/dra/manager.go @@ -98,7 +98,20 @@ func NewManagerImpl(kubeClient clientset.Interface, stateFileDirectory string, n } func (m *ManagerImpl) GetWatcherHandler() cache.PluginHandler { - return cache.PluginHandler(dra.NewRegistrationHandler(m.kubeClient, m.getNode)) + // The time that DRA drivers have to come back after being unregistered + // before the kubelet removes their ResourceSlices. + // + // This must be long enough to actually allow stopping a pod and + // starting the replacement (otherwise ResourceSlices get deleted + // unnecessarily) and not too long (otherwise the time window were + // pods might still get scheduled to the node after removal of a + // driver is too long). + // + // 30 seconds might be long enough for a simple container restart. + // If a DRA driver wants to be sure that slices don't get wiped, + // it should use rolling updates. + wipingDelay := 30 * time.Second + return cache.PluginHandler(dra.NewRegistrationHandler(m.kubeClient, m.getNode, wipingDelay)) } // Start starts the reconcile loop of the manager. diff --git a/pkg/kubelet/cm/dra/manager_test.go b/pkg/kubelet/cm/dra/manager_test.go index bc2448690a8..908cd735847 100644 --- a/pkg/kubelet/cm/dra/manager_test.go +++ b/pkg/kubelet/cm/dra/manager_test.go @@ -580,7 +580,7 @@ func TestPrepareResources(t *testing.T) { } defer draServerInfo.teardownFn() - plg := plugin.NewRegistrationHandler(nil, getFakeNode) + plg := plugin.NewRegistrationHandler(nil, getFakeNode, time.Second /* very short wiping delay for testing */) if err := plg.RegisterPlugin(test.driverName, draServerInfo.socketName, []string{drapb.DRAPluginService}, pluginClientTimeout); err != nil { t.Fatalf("failed to register plugin %s, err: %v", test.driverName, err) } @@ -717,7 +717,7 @@ func TestUnprepareResources(t *testing.T) { } defer draServerInfo.teardownFn() - plg := plugin.NewRegistrationHandler(nil, getFakeNode) + plg := plugin.NewRegistrationHandler(nil, getFakeNode, time.Second /* very short wiping delay for testing */) if err := plg.RegisterPlugin(test.driverName, draServerInfo.socketName, []string{drapb.DRAPluginService}, pluginClientTimeout); err != nil { t.Fatalf("failed to register plugin %s, err: %v", test.driverName, err) } @@ -887,7 +887,7 @@ func TestParallelPrepareUnprepareResources(t *testing.T) { } defer draServerInfo.teardownFn() - plg := plugin.NewRegistrationHandler(nil, getFakeNode) + plg := plugin.NewRegistrationHandler(nil, getFakeNode, time.Second /* very short wiping delay for testing */) if err := plg.RegisterPlugin(driverName, draServerInfo.socketName, []string{drapb.DRAPluginService}, nil); err != nil { t.Fatalf("failed to register plugin %s, err: %v", driverName, err) } diff --git a/pkg/kubelet/cm/dra/plugin/registration.go b/pkg/kubelet/cm/dra/plugin/registration.go index 23ddbb73a70..2af7db5bc3b 100644 --- a/pkg/kubelet/cm/dra/plugin/registration.go +++ b/pkg/kubelet/cm/dra/plugin/registration.go @@ -21,6 +21,7 @@ import ( "errors" "fmt" "slices" + "sync" "time" v1 "k8s.io/api/core/v1" @@ -53,6 +54,18 @@ type RegistrationHandler struct { backgroundCtx context.Context kubeClient kubernetes.Interface getNode func() (*v1.Node, error) + wipingDelay time.Duration + + mutex sync.Mutex + + // pendingWipes maps a plugin name to a cancel function for + // wiping of that plugin's ResourceSlices. Entries get added + // in DeRegisterPlugin and check in RegisterPlugin. If + // wiping is pending during RegisterPlugin, it gets canceled. + // + // Must use pointers to functions because the entries have to + // be comparable. + pendingWipes map[string]*context.CancelCauseFunc } var _ cache.PluginHandler = &RegistrationHandler{} @@ -62,12 +75,14 @@ var _ cache.PluginHandler = &RegistrationHandler{} // Must only be called once per process because it manages global state. // If a kubeClient is provided, then it synchronizes ResourceSlices // with the resource information provided by plugins. -func NewRegistrationHandler(kubeClient kubernetes.Interface, getNode func() (*v1.Node, error)) *RegistrationHandler { +func NewRegistrationHandler(kubeClient kubernetes.Interface, getNode func() (*v1.Node, error), wipingDelay time.Duration) *RegistrationHandler { handler := &RegistrationHandler{ // The context and thus logger should come from the caller. backgroundCtx: klog.NewContext(context.TODO(), klog.LoggerWithName(klog.TODO(), "DRA registration handler")), kubeClient: kubeClient, getNode: getNode, + wipingDelay: wipingDelay, + pendingWipes: make(map[string]*context.CancelCauseFunc), } // When kubelet starts up, no DRA driver has registered yet. None of @@ -77,19 +92,34 @@ func NewRegistrationHandler(kubeClient kubernetes.Interface, getNode func() (*v1 // to start up. // // This has to run in the background. - go handler.wipeResourceSlices("") + logger := klog.LoggerWithName(klog.FromContext(handler.backgroundCtx), "startup") + ctx := klog.NewContext(handler.backgroundCtx, logger) + go handler.wipeResourceSlices(ctx, 0 /* no delay */, "" /* all drivers */) return handler } // wipeResourceSlices deletes ResourceSlices of the node, optionally just for a specific driver. -func (h *RegistrationHandler) wipeResourceSlices(driver string) { +// Wiping will delay for a while and can be canceled by canceling the context. +func (h *RegistrationHandler) wipeResourceSlices(ctx context.Context, delay time.Duration, driver string) { if h.kubeClient == nil { return } - ctx := h.backgroundCtx logger := klog.FromContext(ctx) + if delay != 0 { + // Before we start deleting, give the driver time to bounce back. + // Perhaps it got removed as part of a DaemonSet update and the + // replacement pod is about to start. + logger.V(4).Info("Starting to wait before wiping ResourceSlices", "delay", delay) + select { + case <-ctx.Done(): + logger.V(4).Info("Aborting wiping of ResourceSlices", "reason", context.Cause(ctx)) + case <-time.After(delay): + logger.V(4).Info("Starting to wipe ResourceSlices after waiting", "delay", delay) + } + } + backoff := wait.Backoff{ Duration: time.Second, Factor: 2, @@ -184,6 +214,15 @@ func (h *RegistrationHandler) RegisterPlugin(pluginName string, endpoint string, logger.V(1).Info("DRA plugin already registered, the old plugin was replaced and will be forgotten by the kubelet till the next kubelet restart", "oldEndpoint", oldPlugin.endpoint) } + // Now cancel any pending ResourceSlice wiping for this plugin. + // Only needs to be done once. + h.mutex.Lock() + defer h.mutex.Unlock() + if cancel := h.pendingWipes[pluginName]; cancel != nil { + (*cancel)(errors.New("new plugin instance registered")) + delete(h.pendingWipes, pluginName) + } + return nil } @@ -225,11 +264,42 @@ func (h *RegistrationHandler) DeRegisterPlugin(pluginName string) { logger := klog.FromContext(p.backgroundCtx) logger.V(3).Info("Deregister DRA plugin", "endpoint", p.endpoint) + // Prepare for canceling the background wiping. This needs to run + // in the context of the registration handler, the one from + // the plugin is canceled. + logger = klog.FromContext(h.backgroundCtx) + logger = klog.LoggerWithName(logger, "driver-cleanup") + logger = klog.LoggerWithValues(logger, "pluginName", pluginName) + ctx, cancel := context.WithCancelCause(h.backgroundCtx) + ctx = klog.NewContext(ctx, logger) + // Clean up the ResourceSlices for the deleted Plugin since it // may have died without doing so itself and might never come // back. - go h.wipeResourceSlices(pluginName) + // + // May get canceled if the plugin comes back quickly enough + // (see RegisterPlugin). + h.mutex.Lock() + defer h.mutex.Unlock() + if cancel := h.pendingWipes[pluginName]; cancel != nil { + (*cancel)(errors.New("plugin deregistered a second time")) + } + h.pendingWipes[pluginName] = &cancel + go func() { + defer func() { + h.mutex.Lock() + defer h.mutex.Unlock() + + // Cancel our own context, but remove it from the map only if it + // is the current entry. Perhaps it already got replaced. + cancel(errors.New("wiping done")) + if h.pendingWipes[pluginName] == &cancel { + delete(h.pendingWipes, pluginName) + } + }() + h.wipeResourceSlices(ctx, h.wipingDelay, pluginName) + }() return } diff --git a/pkg/kubelet/cm/dra/plugin/registration_test.go b/pkg/kubelet/cm/dra/plugin/registration_test.go index 949682f6aab..4920fdf34ec 100644 --- a/pkg/kubelet/cm/dra/plugin/registration_test.go +++ b/pkg/kubelet/cm/dra/plugin/registration_test.go @@ -157,7 +157,7 @@ func TestRegistrationHandler(t *testing.T) { } // The handler wipes all slices at startup. - handler := NewRegistrationHandler(client, getFakeNode) + handler := NewRegistrationHandler(client, getFakeNode, time.Second /* very short wiping delay for testing */) requireNoSlices := func() { t.Helper() if client == nil { diff --git a/test/e2e/dra/deploy.go b/test/e2e/dra/deploy.go index 37fc5226e0e..a451be4af68 100644 --- a/test/e2e/dra/deploy.go +++ b/test/e2e/dra/deploy.go @@ -66,6 +66,7 @@ import ( e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper" "k8s.io/kubernetes/test/e2e/storage/drivers/proxy" "k8s.io/kubernetes/test/e2e/storage/utils" + "k8s.io/utils/ptr" "sigs.k8s.io/yaml" ) @@ -214,7 +215,6 @@ func (d *Driver) Run(nodes *Nodes, configureResources func() Resources, devicesP // not run on all nodes. resources.Nodes = nodes.NodeNames } - ginkgo.DeferCleanup(d.IsGone) // Register first so it gets called last. d.SetUp(nodes, resources, devicesPerNode...) ginkgo.DeferCleanup(d.TearDown) } @@ -227,12 +227,22 @@ type MethodInstance struct { type Driver struct { f *framework.Framework ctx context.Context - cleanup []func() // executed first-in-first-out + cleanup []func(context.Context) // executed first-in-first-out wg sync.WaitGroup serviceAccountName string + // NameSuffix can be set while registering a test to deploy different + // drivers in the same test namespace. NameSuffix string - Name string + + // InstanceSuffix can be set while registering a test to deploy two different + // instances of the same driver. Used to generate unique objects in the API server. + // The socket path is still the same. + InstanceSuffix string + + // Name gets derived automatically from the current test namespace and + // (if set) the NameSuffix while setting up the driver for a test. + Name string // Nodes contains entries for each node selected for a test when the test runs. // In addition, there is one entry for a fictional node. @@ -263,9 +273,13 @@ func (d *Driver) SetUp(nodes *Nodes, resources Resources, devicesPerNode ...map[ ctx, cancel := context.WithCancel(context.Background()) logger := klog.FromContext(ctx) logger = klog.LoggerWithValues(logger, "driverName", d.Name) + if d.InstanceSuffix != "" { + instance, _ := strings.CutPrefix(d.InstanceSuffix, "-") + logger = klog.LoggerWithValues(logger, "instance", instance) + } ctx = klog.NewContext(ctx, logger) d.ctx = ctx - d.cleanup = append(d.cleanup, cancel) + d.cleanup = append(d.cleanup, func(context.Context) { cancel() }) if !resources.NodeLocal { // Publish one resource pool with "network-attached" devices. @@ -323,28 +337,32 @@ func (d *Driver) SetUp(nodes *Nodes, resources Resources, devicesPerNode ...map[ } // Create service account and corresponding RBAC rules. - d.serviceAccountName = "dra-kubelet-plugin-" + d.Name + "-service-account" + d.serviceAccountName = "dra-kubelet-plugin-" + d.Name + d.InstanceSuffix + "-service-account" content := pluginPermissions content = strings.ReplaceAll(content, "dra-kubelet-plugin-namespace", d.f.Namespace.Name) - content = strings.ReplaceAll(content, "dra-kubelet-plugin", "dra-kubelet-plugin-"+d.Name) + content = strings.ReplaceAll(content, "dra-kubelet-plugin", "dra-kubelet-plugin-"+d.Name+d.InstanceSuffix) d.createFromYAML(ctx, []byte(content), d.f.Namespace.Name) + // Using a ReplicaSet instead of a DaemonSet has the advantage that we can control + // the lifecycle explicitly, in particular run two pods per node long enough to + // run checks. instanceKey := "app.kubernetes.io/instance" rsName := "" numNodes := int32(len(nodes.NodeNames)) pluginDataDirectoryPath := path.Join(framework.TestContext.KubeletRootDir, "plugins", d.Name) registrarDirectoryPath := path.Join(framework.TestContext.KubeletRootDir, "plugins_registry") registrarSocketFilename := d.Name + "-reg.sock" + instanceName := d.Name + d.InstanceSuffix err := utils.CreateFromManifests(ctx, d.f, d.f.Namespace, func(item interface{}) error { switch item := item.(type) { case *appsv1.ReplicaSet: - item.Name += d.NameSuffix + item.Name += d.NameSuffix + d.InstanceSuffix rsName = item.Name item.Spec.Replicas = &numNodes - item.Spec.Selector.MatchLabels[instanceKey] = d.Name - item.Spec.Template.Labels[instanceKey] = d.Name + item.Spec.Selector.MatchLabels[instanceKey] = instanceName + item.Spec.Template.Labels[instanceKey] = instanceName item.Spec.Template.Spec.ServiceAccountName = d.serviceAccountName - item.Spec.Template.Spec.Affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution[0].LabelSelector.MatchLabels[instanceKey] = d.Name + item.Spec.Template.Spec.Affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution[0].LabelSelector.MatchLabels[instanceKey] = instanceName item.Spec.Template.Spec.Affinity.NodeAffinity = &v1.NodeAffinity{ RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{ NodeSelectorTerms: []v1.NodeSelectorTerm{ @@ -376,7 +394,7 @@ func (d *Driver) SetUp(nodes *Nodes, resources Resources, devicesPerNode ...map[ if err := e2ereplicaset.WaitForReplicaSetTargetAvailableReplicas(ctx, d.f.ClientSet, rs, numNodes); err != nil { framework.ExpectNoError(err, "all kubelet plugin proxies running") } - requirement, err := labels.NewRequirement(instanceKey, selection.Equals, []string{d.Name}) + requirement, err := labels.NewRequirement(instanceKey, selection.Equals, []string{instanceName}) framework.ExpectNoError(err, "create label selector requirement") selector := labels.NewSelector().Add(*requirement) pods, err := d.f.ClientSet.CoreV1().Pods(d.f.Namespace.Name).List(ctx, metav1.ListOptions{LabelSelector: selector.String()}) @@ -446,9 +464,20 @@ func (d *Driver) SetUp(nodes *Nodes, resources Resources, devicesPerNode ...map[ kubeletplugin.RegistrarListener(listen(d.f, &pod, &listenerPort)), ) framework.ExpectNoError(err, "start kubelet plugin for node %s", pod.Spec.NodeName) - d.cleanup = append(d.cleanup, func() { + d.cleanup = append(d.cleanup, func(ctx context.Context) { // Depends on cancel being called first. plugin.Stop() + + // Also explicitly stop all pods. + ginkgo.By("scaling down driver proxy pods for " + d.Name) + rs, err := d.f.ClientSet.AppsV1().ReplicaSets(d.f.Namespace.Name).Get(ctx, rsName, metav1.GetOptions{}) + framework.ExpectNoError(err, "get ReplicaSet for driver "+d.Name) + rs.Spec.Replicas = ptr.To(int32(0)) + rs, err = d.f.ClientSet.AppsV1().ReplicaSets(d.f.Namespace.Name).Update(ctx, rs, metav1.UpdateOptions{}) + framework.ExpectNoError(err, "scale down ReplicaSet for driver "+d.Name) + if err := e2ereplicaset.WaitForReplicaSetTargetAvailableReplicas(ctx, d.f.ClientSet, rs, 0); err != nil { + framework.ExpectNoError(err, "all kubelet plugin proxies stopped") + } }) d.Nodes[nodename] = KubeletPlugin{ExamplePlugin: plugin, ClientSet: driverClient} } @@ -717,14 +746,19 @@ func pipe(ctx context.Context, msg string, verbosity int) *io.PipeWriter { return writer } -func (d *Driver) TearDown() { +func (d *Driver) TearDown(ctx context.Context) { for _, c := range d.cleanup { - c() + c(ctx) } d.cleanup = nil d.wg.Wait() } +// IsGone checks that the kubelet is done with the driver. +// This is done by waiting for the kubelet to remove the +// driver's ResourceSlices, which takes at least 5 minutes +// because of the delay in the kubelet. Only use this in slow +// tests... func (d *Driver) IsGone(ctx context.Context) { gomega.Eventually(ctx, func(ctx context.Context) ([]resourceapi.ResourceSlice, error) { slices, err := d.f.ClientSet.ResourceV1beta1().ResourceSlices().List(ctx, metav1.ListOptions{FieldSelector: resourceapi.ResourceSliceSelectorDriver + "=" + d.Name}) @@ -732,7 +766,7 @@ func (d *Driver) IsGone(ctx context.Context) { return nil, err } return slices.Items, err - }).Should(gomega.BeEmpty()) + }).WithTimeout(7 * time.Minute).Should(gomega.BeEmpty()) } func (d *Driver) interceptor(nodename string, ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { diff --git a/test/e2e/dra/dra.go b/test/e2e/dra/dra.go index b93c44a9b1e..80fd5806cb2 100644 --- a/test/e2e/dra/dra.go +++ b/test/e2e/dra/dra.go @@ -1801,6 +1801,65 @@ var _ = framework.SIGDescribe("node")("DRA", feature.DynamicResourceAllocation, framework.ExpectNoError(f.ClientSet.CoreV1().Pods(pod.Namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{})) framework.ExpectNoError(e2epod.WaitForPodNotFoundInNamespace(ctx, f.ClientSet, pod.Name, pod.Namespace, f.Timeouts.PodDelete)) }) + + f.It("sequential update with pods replacing each other", framework.WithSlow(), func(ctx context.Context) { + nodes := NewNodesNow(ctx, f, 1, 1) + + // Same driver name, same socket path. + oldDriver := NewDriverInstance(f) + oldDriver.InstanceSuffix = "-old" + oldDriver.Run(nodes, perNode(1, nodes)) + + // Collect set of resource slices for that driver. + listSlices := framework.ListObjects(f.ClientSet.ResourceV1beta1().ResourceSlices().List, metav1.ListOptions{ + FieldSelector: "spec.driver=" + oldDriver.Name, + }) + gomega.Eventually(ctx, listSlices).Should(gomega.HaveField("Items", gomega.Not(gomega.BeEmpty())), "driver should have published ResourceSlices, got none") + oldSlices, err := listSlices(ctx) + framework.ExpectNoError(err, "list slices published by old driver") + if len(oldSlices.Items) == 0 { + framework.Fail("driver should have published ResourceSlices, got none") + } + + // "Update" the driver by taking it down and bringing up a new one. + // Pods never run in parallel, similar to how a DaemonSet would update + // its pods. + ginkgo.By("reinstall driver") + start := time.Now() + oldDriver.TearDown(ctx) + newDriver := NewDriverInstance(f) + newDriver.InstanceSuffix = "-new" + newDriver.Run(nodes, perNode(1, nodes)) + updateDuration := time.Since(start) + + // Build behaves the same for both driver instances. + b := newBuilderNow(ctx, f, oldDriver) + claim := b.externalClaim() + pod := b.podExternal() + b.create(ctx, claim, pod) + b.testPod(ctx, f, pod) + + // The slices should have survived the update, but only if it happened + // quickly enough. If it took too long, the kubelet considered the driver + // gone and removed them. + if updateDuration <= 3*time.Minute { + newSlices, err := listSlices(ctx) + framework.ExpectNoError(err, "list slices again") + gomega.Expect(newSlices.Items).To(gomega.ConsistOf(oldSlices.Items)) + } + + // We need to clean up explicitly because the normal + // cleanup doesn't work (driver shuts down first). + // framework.ExpectNoError(f.ClientSet.ResourceV1beta1().ResourceClaims(claim.Namespace).Delete(ctx, claim.Name, metav1.DeleteOptions{})) + framework.ExpectNoError(f.ClientSet.CoreV1().Pods(pod.Namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{})) + framework.ExpectNoError(e2epod.WaitForPodNotFoundInNamespace(ctx, f.ClientSet, pod.Name, pod.Namespace, f.Timeouts.PodDelete)) + + // Now shut down for good and wait for the kubelet to react. + // This takes time... + ginkgo.By("uninstalling driver and waiting for ResourceSlice wiping") + newDriver.TearDown(ctx) + newDriver.IsGone(ctx) + }) }) // builder contains a running counter to make objects unique within thir