diff --git a/staging/src/k8s.io/dynamic-resource-allocation/resourceslice/resourceslicecontroller.go b/staging/src/k8s.io/dynamic-resource-allocation/resourceslice/resourceslicecontroller.go index 9d937bca73b..248df0e66eb 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/resourceslice/resourceslicecontroller.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/resourceslice/resourceslicecontroller.go @@ -20,6 +20,7 @@ import ( "context" "errors" "fmt" + "slices" "sync" "sync/atomic" "time" @@ -31,6 +32,7 @@ import ( apiequality "k8s.io/apimachinery/pkg/api/equality" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/conversion" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/types" utilruntime "k8s.io/apimachinery/pkg/util/runtime" @@ -546,7 +548,7 @@ func (c *Controller) syncPool(ctx context.Context, poolName string) error { if !apiequality.Semantic.DeepEqual(¤tSlice.Spec.Pool, &desiredPool) || !apiequality.Semantic.DeepEqual(currentSlice.Spec.NodeSelector, pool.NodeSelector) || currentSlice.Spec.AllNodes != desiredAllNodes || - !apiequality.Semantic.DeepEqual(currentSlice.Spec.Devices, pool.Slices[i].Devices) { + !DevicesDeepEqual(currentSlice.Spec.Devices, pool.Slices[i].Devices) { changedDesiredSlices.Insert(i) logger.V(5).Info("Need to update slice", "slice", klog.KObj(currentSlice), "matchIndex", i) } @@ -586,7 +588,8 @@ func (c *Controller) syncPool(ctx context.Context, poolName string) error { // have listed the existing slice. slice.Spec.NodeSelector = pool.NodeSelector slice.Spec.AllNodes = desiredAllNodes - slice.Spec.Devices = pool.Slices[i].Devices + // Preserve TimeAdded from existing device, if there is a matching device and taint. + slice.Spec.Devices = copyTaintTimeAdded(slice.Spec.Devices, pool.Slices[i].Devices) logger.V(5).Info("Updating existing resource slice", "slice", klog.KObj(slice)) slice, err := c.kubeClient.ResourceV1beta1().ResourceSlices().Update(ctx, slice, metav1.UpdateOptions{}) @@ -595,6 +598,16 @@ func (c *Controller) syncPool(ctx context.Context, poolName string) error { } atomic.AddInt64(&c.numUpdates, 1) c.sliceStore.Mutation(slice) + + // Some fields may have been dropped. When we receive + // the updated slice through the informer, the + // DeepEqual fails and the controller would try to + // update again, etc. To break that cycle, update our + // desired state of the world so that it matches what + // we can store. + // + // TODO (https://github.com/kubernetes/kubernetes/issues/130856): check for dropped fields and report them to the DRA driver. + pool.Slices[i].Devices = slice.Spec.Devices } // Create new slices. @@ -652,6 +665,16 @@ func (c *Controller) syncPool(ctx context.Context, poolName string) error { atomic.AddInt64(&c.numCreates, 1) c.sliceStore.Mutation(slice) added = true + + // Some fields may have been dropped. When we receive + // the created slice through the informer, the + // DeepEqual fails and the controller would try to + // update, which again suffers from dropped fields, + // etc. To break that cycle, update our desired state + // of the world so that it matches what we can store. + // + // TODO (https://github.com/kubernetes/kubernetes/issues/130856): check for dropped fields and report them to the DRA driver. + pool.Slices[i].Devices = slice.Spec.Devices } if added { // Check that the recently added slice(s) really exist even @@ -713,3 +736,74 @@ func sameSlice(existingSlice *resourceapi.ResourceSlice, desiredSlice *Slice) bo // Same number of devices, names all present -> equal. return true } + +// copyTaintTimeAdded copies existing TimeAdded values from one slice into +// the other if the other one doesn't have it for a taint. Both input +// slices are read-only. +func copyTaintTimeAdded(from, to []resourceapi.Device) []resourceapi.Device { + to = slices.Clone(to) + for i, toDevice := range to { + index := slices.IndexFunc(from, func(fromDevice resourceapi.Device) bool { + return fromDevice.Name == toDevice.Name + }) + if index < 0 { + // No matching device. + continue + } + fromDevice := from[index] + if fromDevice.Basic == nil || toDevice.Basic == nil { + continue + } + for j, toTaint := range toDevice.Basic.Taints { + if toTaint.TimeAdded != nil { + // Already set. + continue + } + // Preserve the old TimeAdded if all other fields are the same. + index := slices.IndexFunc(fromDevice.Basic.Taints, func(fromTaint resourceapi.DeviceTaint) bool { + return toTaint.Key == fromTaint.Key && + toTaint.Value == fromTaint.Value && + toTaint.Effect == fromTaint.Effect + }) + if index < 0 { + // No matching old taint. + continue + } + // In practice, devices are unlikely to have many + // taints. Just clone the entire device before we + // motify it, it's unlikely that we do this more than once. + to[i] = *toDevice.DeepCopy() + to[i].Basic.Taints[j].TimeAdded = fromDevice.Basic.Taints[index].TimeAdded + } + } + return to +} + +// DevicesDeepEqual compares two slices of Devices. It behaves like +// apiequality.Semantic.DeepEqual, with one small difference: +// a nil DeviceTaint.TimeAdded is equal to a non-nil time. +// Also, rounding to full seconds (caused by round-tripping) is +// tolerated. +func DevicesDeepEqual(a, b []resourceapi.Device) bool { + return devicesSemantic.DeepEqual(a, b) +} + +var devicesSemantic = func() conversion.Equalities { + semantic := apiequality.Semantic.Copy() + if err := semantic.AddFunc(deviceTaintEqual); err != nil { + panic(err) + } + return semantic +}() + +func deviceTaintEqual(a, b resourceapi.DeviceTaint) bool { + if a.TimeAdded != nil && b.TimeAdded != nil { + delta := b.TimeAdded.Time.Sub(a.TimeAdded.Time) + if delta < -time.Second || delta > time.Second { + return false + } + } + return a.Key == b.Key && + a.Value == b.Value && + a.Effect == b.Effect +} diff --git a/staging/src/k8s.io/dynamic-resource-allocation/resourceslice/resourceslicecontroller_test.go b/staging/src/k8s.io/dynamic-resource-allocation/resourceslice/resourceslicecontroller_test.go index c8727074065..e65d350554f 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/resourceslice/resourceslicecontroller_test.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/resourceslice/resourceslicecontroller_test.go @@ -85,6 +85,8 @@ func TestControllerSyncPool(t *testing.T) { }}, }}, } + timeAdded = metav1.Now() + timeAddedLater = metav1.Time{Time: timeAdded.Add(time.Minute)} ) testCases := map[string]struct { @@ -143,6 +145,118 @@ func TestControllerSyncPool(t *testing.T) { Pool(resourceapi.ResourcePool{Name: poolName, Generation: 1, ResourceSliceCount: 1}).Obj(), }, }, + "keep-taint-unchanged": { + nodeUID: nodeUID, + initialObjects: []runtime.Object{ + MakeResourceSlice().Name(generatedName1).GenerateName(generateName). + NodeOwnerReferences(ownerName, string(nodeUID)).NodeName(ownerName). + Driver(driverName).Devices([]resourceapi.Device{{ + Name: deviceName, + Basic: &resourceapi.BasicDevice{ + Taints: []resourceapi.DeviceTaint{{ + Effect: resourceapi.DeviceTaintEffectNoExecute, + TimeAdded: &timeAdded, + }}, + }}}). + Pool(resourceapi.ResourcePool{Name: poolName, Generation: 1, ResourceSliceCount: 1}). + Obj(), + }, + inputDriverResources: &DriverResources{ + Pools: map[string]Pool{ + poolName: { + Generation: 1, + Slices: []Slice{{Devices: []resourceapi.Device{{ + Name: deviceName, + Basic: &resourceapi.BasicDevice{ + Taints: []resourceapi.DeviceTaint{{ + Effect: resourceapi.DeviceTaintEffectNoExecute, + // No time added here! No need to update the slice. + }}, + }}}, + }}, + }, + }, + }, + expectedResourceSlices: []resourceapi.ResourceSlice{ + *MakeResourceSlice().Name(generatedName1).GenerateName(generateName). + NodeOwnerReferences(ownerName, string(nodeUID)).NodeName(ownerName). + Driver(driverName).Devices([]resourceapi.Device{{ + Name: deviceName, + Basic: &resourceapi.BasicDevice{ + Taints: []resourceapi.DeviceTaint{{ + Effect: resourceapi.DeviceTaintEffectNoExecute, + TimeAdded: &timeAdded, + }}, + }}}). + Pool(resourceapi.ResourcePool{Name: poolName, Generation: 1, ResourceSliceCount: 1}). + Obj(), + }, + }, + "add-taint": { + nodeUID: nodeUID, + initialObjects: []runtime.Object{ + MakeResourceSlice().Name(generatedName1).GenerateName(generateName). + NodeOwnerReferences(ownerName, string(nodeUID)).NodeName(ownerName). + Driver(driverName).Devices([]resourceapi.Device{{ + Name: deviceName, + Basic: &resourceapi.BasicDevice{ + Taints: []resourceapi.DeviceTaint{{ + Effect: resourceapi.DeviceTaintEffectNoExecute, + TimeAdded: &timeAdded, + }}, + }}}). + Pool(resourceapi.ResourcePool{Name: poolName, Generation: 1, ResourceSliceCount: 1}). + Obj(), + }, + inputDriverResources: &DriverResources{ + Pools: map[string]Pool{ + poolName: { + Generation: 1, + Slices: []Slice{{Devices: []resourceapi.Device{{ + Name: deviceName, + Basic: &resourceapi.BasicDevice{ + Taints: []resourceapi.DeviceTaint{ + { + Effect: resourceapi.DeviceTaintEffectNoExecute, + // No time added here! Time from existing slice must get copied during update. + }, + { + Key: "example.com/tainted", + Effect: resourceapi.DeviceTaintEffectNoSchedule, + TimeAdded: &timeAddedLater, + }, + }, + }}}, + }}, + }, + }, + }, + expectedStats: Stats{ + NumUpdates: 1, + }, + expectedResourceSlices: []resourceapi.ResourceSlice{ + *MakeResourceSlice().Name(generatedName1).GenerateName(generateName). + ResourceVersion("1"). + NodeOwnerReferences(ownerName, string(nodeUID)).NodeName(ownerName). + Driver(driverName).Devices([]resourceapi.Device{{ + Name: deviceName, + Basic: &resourceapi.BasicDevice{ + Taints: []resourceapi.DeviceTaint{ + { + Effect: resourceapi.DeviceTaintEffectNoExecute, + TimeAdded: &timeAdded, + }, + { + Key: "example.com/tainted", + Effect: resourceapi.DeviceTaintEffectNoSchedule, + TimeAdded: &timeAddedLater, + }, + }, + }}}). + Pool(resourceapi.ResourcePool{Name: poolName, Generation: 1, ResourceSliceCount: 1}). + Obj(), + }, + }, "remove-pool": { nodeUID: nodeUID, syncDelay: ptr.To(time.Duration(0)), // Ensure that the initial object causes an immediate sync of the pool. diff --git a/test/integration/dra/dra_test.go b/test/integration/dra/dra_test.go index eea35f5a620..638beb923c9 100644 --- a/test/integration/dra/dra_test.go +++ b/test/integration/dra/dra_test.go @@ -40,6 +40,7 @@ import ( utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/component-base/featuregate" featuregatetesting "k8s.io/component-base/featuregate/testing" + "k8s.io/dynamic-resource-allocation/resourceslice" "k8s.io/klog/v2" kubeschedulerconfigv1 "k8s.io/kube-scheduler/config/v1" kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing" @@ -129,6 +130,7 @@ func TestDRA(t *testing.T) { tCtx.Run("AdminAccess", func(tCtx ktesting.TContext) { testAdminAccess(tCtx, false) }) tCtx.Run("PrioritizedList", func(tCtx ktesting.TContext) { testPrioritizedList(tCtx, false) }) tCtx.Run("Pod", func(tCtx ktesting.TContext) { testPod(tCtx, true) }) + tCtx.Run("PublishResourceSlices", func(tCtx ktesting.TContext) { testPublishResourceSlices(tCtx) }) }, }, "all": { @@ -148,6 +150,7 @@ func TestDRA(t *testing.T) { tCtx.Run("AdminAccess", func(tCtx ktesting.TContext) { testAdminAccess(tCtx, true) }) tCtx.Run("Convert", testConvert) tCtx.Run("PrioritizedList", func(tCtx ktesting.TContext) { testPrioritizedList(tCtx, true) }) + tCtx.Run("PublishResourceSlices", func(tCtx ktesting.TContext) { testPublishResourceSlices(tCtx) }) }, }, } { @@ -308,3 +311,71 @@ func testPrioritizedList(tCtx ktesting.TContext, enabled bool) { }).WithTimeout(time.Minute).WithPolling(time.Second).Should(schedulingAttempted) }) } + +func testPublishResourceSlices(tCtx ktesting.TContext) { + tCtx.Parallel() + + driverName := "dra.example.com" + poolName := "global" + resources := &resourceslice.DriverResources{ + Pools: map[string]resourceslice.Pool{ + poolName: { + Slices: []resourceslice.Slice{ + { + Devices: []resourceapi.Device{ + { + Name: "device-simple", + Basic: &resourceapi.BasicDevice{}, + }, + // TODO: once https://github.com/kubernetes/kubernetes/pull/130764 is merged, + // add tests which detect dropped fields related to it. + }, + }, + { + Devices: []resourceapi.Device{ + { + Name: "device-tainted-default", + Basic: &resourceapi.BasicDevice{ + Taints: []resourceapi.DeviceTaint{{ + Effect: resourceapi.DeviceTaintEffectNoExecute, + // TimeAdded is added by apiserver. + }}, + }, + }, + { + Name: "device-tainted-time-added", + Basic: &resourceapi.BasicDevice{ + Taints: []resourceapi.DeviceTaint{{ + Effect: resourceapi.DeviceTaintEffectNoExecute, + TimeAdded: ptr.To(metav1.Now()), + }}, + }, + }, + }, + }, + }, + }, + }, + } + opts := resourceslice.Options{ + DriverName: driverName, + KubeClient: tCtx.Client(), + SyncDelay: ptr.To(0 * time.Second), + Resources: resources, + } + controller, err := resourceslice.StartController(tCtx, opts) + tCtx.ExpectNoError(err, "start controller") + defer controller.Stop() + + // Two create calls should be all that are needed. + expectedStats := resourceslice.Stats{ + NumCreates: 2, + } + getStats := func(tCtx ktesting.TContext) resourceslice.Stats { + return controller.GetStats() + } + ktesting.Eventually(tCtx, getStats).WithTimeout(10 * time.Second).Should(gomega.Equal(expectedStats)) + + // No further changes necessary. + ktesting.Consistently(tCtx, getStats).WithTimeout(10 * time.Second).Should(gomega.Equal(expectedStats)) +}