mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 19:56:01 +00:00
DRA kubelet: give DRA drivers a 30 second grace period for updates
When doing an update of a DaemonSet, first the old pod gets stopped and then the new one is started. This causes the kubelet to remove all ResourceSlices directly after removal and forces the new pod to recreate all of them. Now the kubelet waits 30 seconds before it deletes ResourceSlices. If a new driver registers during that period, nothing is done at all. The new driver finds the existing ResourceSlices and only needs to update them if something changed. The downside is that if the driver gets removed permanently, this creates a delay where pods might still get scheduled to the node although the driver is not going to run there anymore and thus the pods will be stuck.
This commit is contained in:
parent
0490b9f0b7
commit
760903c0de
@ -98,7 +98,20 @@ func NewManagerImpl(kubeClient clientset.Interface, stateFileDirectory string, n
|
||||
}
|
||||
|
||||
func (m *ManagerImpl) GetWatcherHandler() cache.PluginHandler {
|
||||
return cache.PluginHandler(dra.NewRegistrationHandler(m.kubeClient, m.getNode))
|
||||
// The time that DRA drivers have to come back after being unregistered
|
||||
// before the kubelet removes their ResourceSlices.
|
||||
//
|
||||
// This must be long enough to actually allow stopping a pod and
|
||||
// starting the replacement (otherwise ResourceSlices get deleted
|
||||
// unnecessarily) and not too long (otherwise the time window were
|
||||
// pods might still get scheduled to the node after removal of a
|
||||
// driver is too long).
|
||||
//
|
||||
// 30 seconds might be long enough for a simple container restart.
|
||||
// If a DRA driver wants to be sure that slices don't get wiped,
|
||||
// it should use rolling updates.
|
||||
wipingDelay := 30 * time.Second
|
||||
return cache.PluginHandler(dra.NewRegistrationHandler(m.kubeClient, m.getNode, wipingDelay))
|
||||
}
|
||||
|
||||
// Start starts the reconcile loop of the manager.
|
||||
|
@ -580,7 +580,7 @@ func TestPrepareResources(t *testing.T) {
|
||||
}
|
||||
defer draServerInfo.teardownFn()
|
||||
|
||||
plg := plugin.NewRegistrationHandler(nil, getFakeNode)
|
||||
plg := plugin.NewRegistrationHandler(nil, getFakeNode, time.Second /* very short wiping delay for testing */)
|
||||
if err := plg.RegisterPlugin(test.driverName, draServerInfo.socketName, []string{drapb.DRAPluginService}, pluginClientTimeout); err != nil {
|
||||
t.Fatalf("failed to register plugin %s, err: %v", test.driverName, err)
|
||||
}
|
||||
@ -717,7 +717,7 @@ func TestUnprepareResources(t *testing.T) {
|
||||
}
|
||||
defer draServerInfo.teardownFn()
|
||||
|
||||
plg := plugin.NewRegistrationHandler(nil, getFakeNode)
|
||||
plg := plugin.NewRegistrationHandler(nil, getFakeNode, time.Second /* very short wiping delay for testing */)
|
||||
if err := plg.RegisterPlugin(test.driverName, draServerInfo.socketName, []string{drapb.DRAPluginService}, pluginClientTimeout); err != nil {
|
||||
t.Fatalf("failed to register plugin %s, err: %v", test.driverName, err)
|
||||
}
|
||||
@ -887,7 +887,7 @@ func TestParallelPrepareUnprepareResources(t *testing.T) {
|
||||
}
|
||||
defer draServerInfo.teardownFn()
|
||||
|
||||
plg := plugin.NewRegistrationHandler(nil, getFakeNode)
|
||||
plg := plugin.NewRegistrationHandler(nil, getFakeNode, time.Second /* very short wiping delay for testing */)
|
||||
if err := plg.RegisterPlugin(driverName, draServerInfo.socketName, []string{drapb.DRAPluginService}, nil); err != nil {
|
||||
t.Fatalf("failed to register plugin %s, err: %v", driverName, err)
|
||||
}
|
||||
|
@ -21,6 +21,7 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"slices"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
@ -53,6 +54,18 @@ type RegistrationHandler struct {
|
||||
backgroundCtx context.Context
|
||||
kubeClient kubernetes.Interface
|
||||
getNode func() (*v1.Node, error)
|
||||
wipingDelay time.Duration
|
||||
|
||||
mutex sync.Mutex
|
||||
|
||||
// pendingWipes maps a plugin name to a cancel function for
|
||||
// wiping of that plugin's ResourceSlices. Entries get added
|
||||
// in DeRegisterPlugin and check in RegisterPlugin. If
|
||||
// wiping is pending during RegisterPlugin, it gets canceled.
|
||||
//
|
||||
// Must use pointers to functions because the entries have to
|
||||
// be comparable.
|
||||
pendingWipes map[string]*context.CancelCauseFunc
|
||||
}
|
||||
|
||||
var _ cache.PluginHandler = &RegistrationHandler{}
|
||||
@ -62,12 +75,14 @@ var _ cache.PluginHandler = &RegistrationHandler{}
|
||||
// Must only be called once per process because it manages global state.
|
||||
// If a kubeClient is provided, then it synchronizes ResourceSlices
|
||||
// with the resource information provided by plugins.
|
||||
func NewRegistrationHandler(kubeClient kubernetes.Interface, getNode func() (*v1.Node, error)) *RegistrationHandler {
|
||||
func NewRegistrationHandler(kubeClient kubernetes.Interface, getNode func() (*v1.Node, error), wipingDelay time.Duration) *RegistrationHandler {
|
||||
handler := &RegistrationHandler{
|
||||
// The context and thus logger should come from the caller.
|
||||
backgroundCtx: klog.NewContext(context.TODO(), klog.LoggerWithName(klog.TODO(), "DRA registration handler")),
|
||||
kubeClient: kubeClient,
|
||||
getNode: getNode,
|
||||
wipingDelay: wipingDelay,
|
||||
pendingWipes: make(map[string]*context.CancelCauseFunc),
|
||||
}
|
||||
|
||||
// When kubelet starts up, no DRA driver has registered yet. None of
|
||||
@ -77,19 +92,34 @@ func NewRegistrationHandler(kubeClient kubernetes.Interface, getNode func() (*v1
|
||||
// to start up.
|
||||
//
|
||||
// This has to run in the background.
|
||||
go handler.wipeResourceSlices("")
|
||||
logger := klog.LoggerWithName(klog.FromContext(handler.backgroundCtx), "startup")
|
||||
ctx := klog.NewContext(handler.backgroundCtx, logger)
|
||||
go handler.wipeResourceSlices(ctx, 0 /* no delay */, "" /* all drivers */)
|
||||
|
||||
return handler
|
||||
}
|
||||
|
||||
// wipeResourceSlices deletes ResourceSlices of the node, optionally just for a specific driver.
|
||||
func (h *RegistrationHandler) wipeResourceSlices(driver string) {
|
||||
// Wiping will delay for a while and can be canceled by canceling the context.
|
||||
func (h *RegistrationHandler) wipeResourceSlices(ctx context.Context, delay time.Duration, driver string) {
|
||||
if h.kubeClient == nil {
|
||||
return
|
||||
}
|
||||
ctx := h.backgroundCtx
|
||||
logger := klog.FromContext(ctx)
|
||||
|
||||
if delay != 0 {
|
||||
// Before we start deleting, give the driver time to bounce back.
|
||||
// Perhaps it got removed as part of a DaemonSet update and the
|
||||
// replacement pod is about to start.
|
||||
logger.V(4).Info("Starting to wait before wiping ResourceSlices", "delay", delay)
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
logger.V(4).Info("Aborting wiping of ResourceSlices", "reason", context.Cause(ctx))
|
||||
case <-time.After(delay):
|
||||
logger.V(4).Info("Starting to wipe ResourceSlices after waiting", "delay", delay)
|
||||
}
|
||||
}
|
||||
|
||||
backoff := wait.Backoff{
|
||||
Duration: time.Second,
|
||||
Factor: 2,
|
||||
@ -184,6 +214,15 @@ func (h *RegistrationHandler) RegisterPlugin(pluginName string, endpoint string,
|
||||
logger.V(1).Info("DRA plugin already registered, the old plugin was replaced and will be forgotten by the kubelet till the next kubelet restart", "oldEndpoint", oldPlugin.endpoint)
|
||||
}
|
||||
|
||||
// Now cancel any pending ResourceSlice wiping for this plugin.
|
||||
// Only needs to be done once.
|
||||
h.mutex.Lock()
|
||||
defer h.mutex.Unlock()
|
||||
if cancel := h.pendingWipes[pluginName]; cancel != nil {
|
||||
(*cancel)(errors.New("new plugin instance registered"))
|
||||
delete(h.pendingWipes, pluginName)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -225,11 +264,42 @@ func (h *RegistrationHandler) DeRegisterPlugin(pluginName string) {
|
||||
logger := klog.FromContext(p.backgroundCtx)
|
||||
logger.V(3).Info("Deregister DRA plugin", "endpoint", p.endpoint)
|
||||
|
||||
// Prepare for canceling the background wiping. This needs to run
|
||||
// in the context of the registration handler, the one from
|
||||
// the plugin is canceled.
|
||||
logger = klog.FromContext(h.backgroundCtx)
|
||||
logger = klog.LoggerWithName(logger, "driver-cleanup")
|
||||
logger = klog.LoggerWithValues(logger, "pluginName", pluginName)
|
||||
ctx, cancel := context.WithCancelCause(h.backgroundCtx)
|
||||
ctx = klog.NewContext(ctx, logger)
|
||||
|
||||
// Clean up the ResourceSlices for the deleted Plugin since it
|
||||
// may have died without doing so itself and might never come
|
||||
// back.
|
||||
go h.wipeResourceSlices(pluginName)
|
||||
//
|
||||
// May get canceled if the plugin comes back quickly enough
|
||||
// (see RegisterPlugin).
|
||||
h.mutex.Lock()
|
||||
defer h.mutex.Unlock()
|
||||
if cancel := h.pendingWipes[pluginName]; cancel != nil {
|
||||
(*cancel)(errors.New("plugin deregistered a second time"))
|
||||
}
|
||||
h.pendingWipes[pluginName] = &cancel
|
||||
|
||||
go func() {
|
||||
defer func() {
|
||||
h.mutex.Lock()
|
||||
defer h.mutex.Unlock()
|
||||
|
||||
// Cancel our own context, but remove it from the map only if it
|
||||
// is the current entry. Perhaps it already got replaced.
|
||||
cancel(errors.New("wiping done"))
|
||||
if h.pendingWipes[pluginName] == &cancel {
|
||||
delete(h.pendingWipes, pluginName)
|
||||
}
|
||||
}()
|
||||
h.wipeResourceSlices(ctx, h.wipingDelay, pluginName)
|
||||
}()
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -157,7 +157,7 @@ func TestRegistrationHandler(t *testing.T) {
|
||||
}
|
||||
|
||||
// The handler wipes all slices at startup.
|
||||
handler := NewRegistrationHandler(client, getFakeNode)
|
||||
handler := NewRegistrationHandler(client, getFakeNode, time.Second /* very short wiping delay for testing */)
|
||||
requireNoSlices := func() {
|
||||
t.Helper()
|
||||
if client == nil {
|
||||
|
@ -66,6 +66,7 @@ import (
|
||||
e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper"
|
||||
"k8s.io/kubernetes/test/e2e/storage/drivers/proxy"
|
||||
"k8s.io/kubernetes/test/e2e/storage/utils"
|
||||
"k8s.io/utils/ptr"
|
||||
"sigs.k8s.io/yaml"
|
||||
)
|
||||
|
||||
@ -214,7 +215,6 @@ func (d *Driver) Run(nodes *Nodes, configureResources func() Resources, devicesP
|
||||
// not run on all nodes.
|
||||
resources.Nodes = nodes.NodeNames
|
||||
}
|
||||
ginkgo.DeferCleanup(d.IsGone) // Register first so it gets called last.
|
||||
d.SetUp(nodes, resources, devicesPerNode...)
|
||||
ginkgo.DeferCleanup(d.TearDown)
|
||||
}
|
||||
@ -227,12 +227,22 @@ type MethodInstance struct {
|
||||
type Driver struct {
|
||||
f *framework.Framework
|
||||
ctx context.Context
|
||||
cleanup []func() // executed first-in-first-out
|
||||
cleanup []func(context.Context) // executed first-in-first-out
|
||||
wg sync.WaitGroup
|
||||
serviceAccountName string
|
||||
|
||||
// NameSuffix can be set while registering a test to deploy different
|
||||
// drivers in the same test namespace.
|
||||
NameSuffix string
|
||||
Name string
|
||||
|
||||
// InstanceSuffix can be set while registering a test to deploy two different
|
||||
// instances of the same driver. Used to generate unique objects in the API server.
|
||||
// The socket path is still the same.
|
||||
InstanceSuffix string
|
||||
|
||||
// Name gets derived automatically from the current test namespace and
|
||||
// (if set) the NameSuffix while setting up the driver for a test.
|
||||
Name string
|
||||
|
||||
// Nodes contains entries for each node selected for a test when the test runs.
|
||||
// In addition, there is one entry for a fictional node.
|
||||
@ -263,9 +273,13 @@ func (d *Driver) SetUp(nodes *Nodes, resources Resources, devicesPerNode ...map[
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
logger := klog.FromContext(ctx)
|
||||
logger = klog.LoggerWithValues(logger, "driverName", d.Name)
|
||||
if d.InstanceSuffix != "" {
|
||||
instance, _ := strings.CutPrefix(d.InstanceSuffix, "-")
|
||||
logger = klog.LoggerWithValues(logger, "instance", instance)
|
||||
}
|
||||
ctx = klog.NewContext(ctx, logger)
|
||||
d.ctx = ctx
|
||||
d.cleanup = append(d.cleanup, cancel)
|
||||
d.cleanup = append(d.cleanup, func(context.Context) { cancel() })
|
||||
|
||||
if !resources.NodeLocal {
|
||||
// Publish one resource pool with "network-attached" devices.
|
||||
@ -323,28 +337,32 @@ func (d *Driver) SetUp(nodes *Nodes, resources Resources, devicesPerNode ...map[
|
||||
}
|
||||
|
||||
// Create service account and corresponding RBAC rules.
|
||||
d.serviceAccountName = "dra-kubelet-plugin-" + d.Name + "-service-account"
|
||||
d.serviceAccountName = "dra-kubelet-plugin-" + d.Name + d.InstanceSuffix + "-service-account"
|
||||
content := pluginPermissions
|
||||
content = strings.ReplaceAll(content, "dra-kubelet-plugin-namespace", d.f.Namespace.Name)
|
||||
content = strings.ReplaceAll(content, "dra-kubelet-plugin", "dra-kubelet-plugin-"+d.Name)
|
||||
content = strings.ReplaceAll(content, "dra-kubelet-plugin", "dra-kubelet-plugin-"+d.Name+d.InstanceSuffix)
|
||||
d.createFromYAML(ctx, []byte(content), d.f.Namespace.Name)
|
||||
|
||||
// Using a ReplicaSet instead of a DaemonSet has the advantage that we can control
|
||||
// the lifecycle explicitly, in particular run two pods per node long enough to
|
||||
// run checks.
|
||||
instanceKey := "app.kubernetes.io/instance"
|
||||
rsName := ""
|
||||
numNodes := int32(len(nodes.NodeNames))
|
||||
pluginDataDirectoryPath := path.Join(framework.TestContext.KubeletRootDir, "plugins", d.Name)
|
||||
registrarDirectoryPath := path.Join(framework.TestContext.KubeletRootDir, "plugins_registry")
|
||||
registrarSocketFilename := d.Name + "-reg.sock"
|
||||
instanceName := d.Name + d.InstanceSuffix
|
||||
err := utils.CreateFromManifests(ctx, d.f, d.f.Namespace, func(item interface{}) error {
|
||||
switch item := item.(type) {
|
||||
case *appsv1.ReplicaSet:
|
||||
item.Name += d.NameSuffix
|
||||
item.Name += d.NameSuffix + d.InstanceSuffix
|
||||
rsName = item.Name
|
||||
item.Spec.Replicas = &numNodes
|
||||
item.Spec.Selector.MatchLabels[instanceKey] = d.Name
|
||||
item.Spec.Template.Labels[instanceKey] = d.Name
|
||||
item.Spec.Selector.MatchLabels[instanceKey] = instanceName
|
||||
item.Spec.Template.Labels[instanceKey] = instanceName
|
||||
item.Spec.Template.Spec.ServiceAccountName = d.serviceAccountName
|
||||
item.Spec.Template.Spec.Affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution[0].LabelSelector.MatchLabels[instanceKey] = d.Name
|
||||
item.Spec.Template.Spec.Affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution[0].LabelSelector.MatchLabels[instanceKey] = instanceName
|
||||
item.Spec.Template.Spec.Affinity.NodeAffinity = &v1.NodeAffinity{
|
||||
RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{
|
||||
NodeSelectorTerms: []v1.NodeSelectorTerm{
|
||||
@ -376,7 +394,7 @@ func (d *Driver) SetUp(nodes *Nodes, resources Resources, devicesPerNode ...map[
|
||||
if err := e2ereplicaset.WaitForReplicaSetTargetAvailableReplicas(ctx, d.f.ClientSet, rs, numNodes); err != nil {
|
||||
framework.ExpectNoError(err, "all kubelet plugin proxies running")
|
||||
}
|
||||
requirement, err := labels.NewRequirement(instanceKey, selection.Equals, []string{d.Name})
|
||||
requirement, err := labels.NewRequirement(instanceKey, selection.Equals, []string{instanceName})
|
||||
framework.ExpectNoError(err, "create label selector requirement")
|
||||
selector := labels.NewSelector().Add(*requirement)
|
||||
pods, err := d.f.ClientSet.CoreV1().Pods(d.f.Namespace.Name).List(ctx, metav1.ListOptions{LabelSelector: selector.String()})
|
||||
@ -446,9 +464,20 @@ func (d *Driver) SetUp(nodes *Nodes, resources Resources, devicesPerNode ...map[
|
||||
kubeletplugin.RegistrarListener(listen(d.f, &pod, &listenerPort)),
|
||||
)
|
||||
framework.ExpectNoError(err, "start kubelet plugin for node %s", pod.Spec.NodeName)
|
||||
d.cleanup = append(d.cleanup, func() {
|
||||
d.cleanup = append(d.cleanup, func(ctx context.Context) {
|
||||
// Depends on cancel being called first.
|
||||
plugin.Stop()
|
||||
|
||||
// Also explicitly stop all pods.
|
||||
ginkgo.By("scaling down driver proxy pods for " + d.Name)
|
||||
rs, err := d.f.ClientSet.AppsV1().ReplicaSets(d.f.Namespace.Name).Get(ctx, rsName, metav1.GetOptions{})
|
||||
framework.ExpectNoError(err, "get ReplicaSet for driver "+d.Name)
|
||||
rs.Spec.Replicas = ptr.To(int32(0))
|
||||
rs, err = d.f.ClientSet.AppsV1().ReplicaSets(d.f.Namespace.Name).Update(ctx, rs, metav1.UpdateOptions{})
|
||||
framework.ExpectNoError(err, "scale down ReplicaSet for driver "+d.Name)
|
||||
if err := e2ereplicaset.WaitForReplicaSetTargetAvailableReplicas(ctx, d.f.ClientSet, rs, 0); err != nil {
|
||||
framework.ExpectNoError(err, "all kubelet plugin proxies stopped")
|
||||
}
|
||||
})
|
||||
d.Nodes[nodename] = KubeletPlugin{ExamplePlugin: plugin, ClientSet: driverClient}
|
||||
}
|
||||
@ -717,14 +746,19 @@ func pipe(ctx context.Context, msg string, verbosity int) *io.PipeWriter {
|
||||
return writer
|
||||
}
|
||||
|
||||
func (d *Driver) TearDown() {
|
||||
func (d *Driver) TearDown(ctx context.Context) {
|
||||
for _, c := range d.cleanup {
|
||||
c()
|
||||
c(ctx)
|
||||
}
|
||||
d.cleanup = nil
|
||||
d.wg.Wait()
|
||||
}
|
||||
|
||||
// IsGone checks that the kubelet is done with the driver.
|
||||
// This is done by waiting for the kubelet to remove the
|
||||
// driver's ResourceSlices, which takes at least 5 minutes
|
||||
// because of the delay in the kubelet. Only use this in slow
|
||||
// tests...
|
||||
func (d *Driver) IsGone(ctx context.Context) {
|
||||
gomega.Eventually(ctx, func(ctx context.Context) ([]resourceapi.ResourceSlice, error) {
|
||||
slices, err := d.f.ClientSet.ResourceV1beta1().ResourceSlices().List(ctx, metav1.ListOptions{FieldSelector: resourceapi.ResourceSliceSelectorDriver + "=" + d.Name})
|
||||
@ -732,7 +766,7 @@ func (d *Driver) IsGone(ctx context.Context) {
|
||||
return nil, err
|
||||
}
|
||||
return slices.Items, err
|
||||
}).Should(gomega.BeEmpty())
|
||||
}).WithTimeout(7 * time.Minute).Should(gomega.BeEmpty())
|
||||
}
|
||||
|
||||
func (d *Driver) interceptor(nodename string, ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
|
||||
|
@ -1801,6 +1801,65 @@ var _ = framework.SIGDescribe("node")("DRA", feature.DynamicResourceAllocation,
|
||||
framework.ExpectNoError(f.ClientSet.CoreV1().Pods(pod.Namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{}))
|
||||
framework.ExpectNoError(e2epod.WaitForPodNotFoundInNamespace(ctx, f.ClientSet, pod.Name, pod.Namespace, f.Timeouts.PodDelete))
|
||||
})
|
||||
|
||||
f.It("sequential update with pods replacing each other", framework.WithSlow(), func(ctx context.Context) {
|
||||
nodes := NewNodesNow(ctx, f, 1, 1)
|
||||
|
||||
// Same driver name, same socket path.
|
||||
oldDriver := NewDriverInstance(f)
|
||||
oldDriver.InstanceSuffix = "-old"
|
||||
oldDriver.Run(nodes, perNode(1, nodes))
|
||||
|
||||
// Collect set of resource slices for that driver.
|
||||
listSlices := framework.ListObjects(f.ClientSet.ResourceV1beta1().ResourceSlices().List, metav1.ListOptions{
|
||||
FieldSelector: "spec.driver=" + oldDriver.Name,
|
||||
})
|
||||
gomega.Eventually(ctx, listSlices).Should(gomega.HaveField("Items", gomega.Not(gomega.BeEmpty())), "driver should have published ResourceSlices, got none")
|
||||
oldSlices, err := listSlices(ctx)
|
||||
framework.ExpectNoError(err, "list slices published by old driver")
|
||||
if len(oldSlices.Items) == 0 {
|
||||
framework.Fail("driver should have published ResourceSlices, got none")
|
||||
}
|
||||
|
||||
// "Update" the driver by taking it down and bringing up a new one.
|
||||
// Pods never run in parallel, similar to how a DaemonSet would update
|
||||
// its pods.
|
||||
ginkgo.By("reinstall driver")
|
||||
start := time.Now()
|
||||
oldDriver.TearDown(ctx)
|
||||
newDriver := NewDriverInstance(f)
|
||||
newDriver.InstanceSuffix = "-new"
|
||||
newDriver.Run(nodes, perNode(1, nodes))
|
||||
updateDuration := time.Since(start)
|
||||
|
||||
// Build behaves the same for both driver instances.
|
||||
b := newBuilderNow(ctx, f, oldDriver)
|
||||
claim := b.externalClaim()
|
||||
pod := b.podExternal()
|
||||
b.create(ctx, claim, pod)
|
||||
b.testPod(ctx, f, pod)
|
||||
|
||||
// The slices should have survived the update, but only if it happened
|
||||
// quickly enough. If it took too long, the kubelet considered the driver
|
||||
// gone and removed them.
|
||||
if updateDuration <= 3*time.Minute {
|
||||
newSlices, err := listSlices(ctx)
|
||||
framework.ExpectNoError(err, "list slices again")
|
||||
gomega.Expect(newSlices.Items).To(gomega.ConsistOf(oldSlices.Items))
|
||||
}
|
||||
|
||||
// We need to clean up explicitly because the normal
|
||||
// cleanup doesn't work (driver shuts down first).
|
||||
// framework.ExpectNoError(f.ClientSet.ResourceV1beta1().ResourceClaims(claim.Namespace).Delete(ctx, claim.Name, metav1.DeleteOptions{}))
|
||||
framework.ExpectNoError(f.ClientSet.CoreV1().Pods(pod.Namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{}))
|
||||
framework.ExpectNoError(e2epod.WaitForPodNotFoundInNamespace(ctx, f.ClientSet, pod.Name, pod.Namespace, f.Timeouts.PodDelete))
|
||||
|
||||
// Now shut down for good and wait for the kubelet to react.
|
||||
// This takes time...
|
||||
ginkgo.By("uninstalling driver and waiting for ResourceSlice wiping")
|
||||
newDriver.TearDown(ctx)
|
||||
newDriver.IsGone(ctx)
|
||||
})
|
||||
})
|
||||
|
||||
// builder contains a running counter to make objects unique within thir
|
||||
|
Loading…
Reference in New Issue
Block a user