DRA helper: support dropped fields and TimeAdded defaults

Both the new DeviceTaint.TimeAdded and dropped fields when
the DRADeviceTaints feature is disabled confused the ResourceSlice
controller because what is stored and sent back can be different
from what the controller wants to store.

It's now more lenient regarding TimeAdded (doesn't need to be exact because of
rounding during serialization, only having a value on the server is okay)
and dropped fields (doesn't try to store them again). It also preserves
a server-side TimeAdded when updating slices.
This commit is contained in:
Patrick Ohly 2025-03-16 21:21:04 +01:00
parent 2499663b84
commit 37b47f4724
3 changed files with 281 additions and 2 deletions

View File

@ -20,6 +20,7 @@ import (
"context"
"errors"
"fmt"
"slices"
"sync"
"sync/atomic"
"time"
@ -31,6 +32,7 @@ import (
apiequality "k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/conversion"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
@ -546,7 +548,7 @@ func (c *Controller) syncPool(ctx context.Context, poolName string) error {
if !apiequality.Semantic.DeepEqual(&currentSlice.Spec.Pool, &desiredPool) ||
!apiequality.Semantic.DeepEqual(currentSlice.Spec.NodeSelector, pool.NodeSelector) ||
currentSlice.Spec.AllNodes != desiredAllNodes ||
!apiequality.Semantic.DeepEqual(currentSlice.Spec.Devices, pool.Slices[i].Devices) {
!DevicesDeepEqual(currentSlice.Spec.Devices, pool.Slices[i].Devices) {
changedDesiredSlices.Insert(i)
logger.V(5).Info("Need to update slice", "slice", klog.KObj(currentSlice), "matchIndex", i)
}
@ -586,7 +588,8 @@ func (c *Controller) syncPool(ctx context.Context, poolName string) error {
// have listed the existing slice.
slice.Spec.NodeSelector = pool.NodeSelector
slice.Spec.AllNodes = desiredAllNodes
slice.Spec.Devices = pool.Slices[i].Devices
// Preserve TimeAdded from existing device, if there is a matching device and taint.
slice.Spec.Devices = copyTaintTimeAdded(slice.Spec.Devices, pool.Slices[i].Devices)
logger.V(5).Info("Updating existing resource slice", "slice", klog.KObj(slice))
slice, err := c.kubeClient.ResourceV1beta1().ResourceSlices().Update(ctx, slice, metav1.UpdateOptions{})
@ -595,6 +598,16 @@ func (c *Controller) syncPool(ctx context.Context, poolName string) error {
}
atomic.AddInt64(&c.numUpdates, 1)
c.sliceStore.Mutation(slice)
// Some fields may have been dropped. When we receive
// the updated slice through the informer, the
// DeepEqual fails and the controller would try to
// update again, etc. To break that cycle, update our
// desired state of the world so that it matches what
// we can store.
//
// TODO (https://github.com/kubernetes/kubernetes/issues/130856): check for dropped fields and report them to the DRA driver.
pool.Slices[i].Devices = slice.Spec.Devices
}
// Create new slices.
@ -652,6 +665,16 @@ func (c *Controller) syncPool(ctx context.Context, poolName string) error {
atomic.AddInt64(&c.numCreates, 1)
c.sliceStore.Mutation(slice)
added = true
// Some fields may have been dropped. When we receive
// the created slice through the informer, the
// DeepEqual fails and the controller would try to
// update, which again suffers from dropped fields,
// etc. To break that cycle, update our desired state
// of the world so that it matches what we can store.
//
// TODO (https://github.com/kubernetes/kubernetes/issues/130856): check for dropped fields and report them to the DRA driver.
pool.Slices[i].Devices = slice.Spec.Devices
}
if added {
// Check that the recently added slice(s) really exist even
@ -713,3 +736,74 @@ func sameSlice(existingSlice *resourceapi.ResourceSlice, desiredSlice *Slice) bo
// Same number of devices, names all present -> equal.
return true
}
// copyTaintTimeAdded copies existing TimeAdded values from one slice into
// the other if the other one doesn't have it for a taint. Both input
// slices are read-only.
func copyTaintTimeAdded(from, to []resourceapi.Device) []resourceapi.Device {
to = slices.Clone(to)
for i, toDevice := range to {
index := slices.IndexFunc(from, func(fromDevice resourceapi.Device) bool {
return fromDevice.Name == toDevice.Name
})
if index < 0 {
// No matching device.
continue
}
fromDevice := from[index]
if fromDevice.Basic == nil || toDevice.Basic == nil {
continue
}
for j, toTaint := range toDevice.Basic.Taints {
if toTaint.TimeAdded != nil {
// Already set.
continue
}
// Preserve the old TimeAdded if all other fields are the same.
index := slices.IndexFunc(fromDevice.Basic.Taints, func(fromTaint resourceapi.DeviceTaint) bool {
return toTaint.Key == fromTaint.Key &&
toTaint.Value == fromTaint.Value &&
toTaint.Effect == fromTaint.Effect
})
if index < 0 {
// No matching old taint.
continue
}
// In practice, devices are unlikely to have many
// taints. Just clone the entire device before we
// motify it, it's unlikely that we do this more than once.
to[i] = *toDevice.DeepCopy()
to[i].Basic.Taints[j].TimeAdded = fromDevice.Basic.Taints[index].TimeAdded
}
}
return to
}
// DevicesDeepEqual compares two slices of Devices. It behaves like
// apiequality.Semantic.DeepEqual, with one small difference:
// a nil DeviceTaint.TimeAdded is equal to a non-nil time.
// Also, rounding to full seconds (caused by round-tripping) is
// tolerated.
func DevicesDeepEqual(a, b []resourceapi.Device) bool {
return devicesSemantic.DeepEqual(a, b)
}
var devicesSemantic = func() conversion.Equalities {
semantic := apiequality.Semantic.Copy()
if err := semantic.AddFunc(deviceTaintEqual); err != nil {
panic(err)
}
return semantic
}()
func deviceTaintEqual(a, b resourceapi.DeviceTaint) bool {
if a.TimeAdded != nil && b.TimeAdded != nil {
delta := b.TimeAdded.Time.Sub(a.TimeAdded.Time)
if delta < -time.Second || delta > time.Second {
return false
}
}
return a.Key == b.Key &&
a.Value == b.Value &&
a.Effect == b.Effect
}

View File

@ -85,6 +85,8 @@ func TestControllerSyncPool(t *testing.T) {
}},
}},
}
timeAdded = metav1.Now()
timeAddedLater = metav1.Time{Time: timeAdded.Add(time.Minute)}
)
testCases := map[string]struct {
@ -143,6 +145,118 @@ func TestControllerSyncPool(t *testing.T) {
Pool(resourceapi.ResourcePool{Name: poolName, Generation: 1, ResourceSliceCount: 1}).Obj(),
},
},
"keep-taint-unchanged": {
nodeUID: nodeUID,
initialObjects: []runtime.Object{
MakeResourceSlice().Name(generatedName1).GenerateName(generateName).
NodeOwnerReferences(ownerName, string(nodeUID)).NodeName(ownerName).
Driver(driverName).Devices([]resourceapi.Device{{
Name: deviceName,
Basic: &resourceapi.BasicDevice{
Taints: []resourceapi.DeviceTaint{{
Effect: resourceapi.DeviceTaintEffectNoExecute,
TimeAdded: &timeAdded,
}},
}}}).
Pool(resourceapi.ResourcePool{Name: poolName, Generation: 1, ResourceSliceCount: 1}).
Obj(),
},
inputDriverResources: &DriverResources{
Pools: map[string]Pool{
poolName: {
Generation: 1,
Slices: []Slice{{Devices: []resourceapi.Device{{
Name: deviceName,
Basic: &resourceapi.BasicDevice{
Taints: []resourceapi.DeviceTaint{{
Effect: resourceapi.DeviceTaintEffectNoExecute,
// No time added here! No need to update the slice.
}},
}}},
}},
},
},
},
expectedResourceSlices: []resourceapi.ResourceSlice{
*MakeResourceSlice().Name(generatedName1).GenerateName(generateName).
NodeOwnerReferences(ownerName, string(nodeUID)).NodeName(ownerName).
Driver(driverName).Devices([]resourceapi.Device{{
Name: deviceName,
Basic: &resourceapi.BasicDevice{
Taints: []resourceapi.DeviceTaint{{
Effect: resourceapi.DeviceTaintEffectNoExecute,
TimeAdded: &timeAdded,
}},
}}}).
Pool(resourceapi.ResourcePool{Name: poolName, Generation: 1, ResourceSliceCount: 1}).
Obj(),
},
},
"add-taint": {
nodeUID: nodeUID,
initialObjects: []runtime.Object{
MakeResourceSlice().Name(generatedName1).GenerateName(generateName).
NodeOwnerReferences(ownerName, string(nodeUID)).NodeName(ownerName).
Driver(driverName).Devices([]resourceapi.Device{{
Name: deviceName,
Basic: &resourceapi.BasicDevice{
Taints: []resourceapi.DeviceTaint{{
Effect: resourceapi.DeviceTaintEffectNoExecute,
TimeAdded: &timeAdded,
}},
}}}).
Pool(resourceapi.ResourcePool{Name: poolName, Generation: 1, ResourceSliceCount: 1}).
Obj(),
},
inputDriverResources: &DriverResources{
Pools: map[string]Pool{
poolName: {
Generation: 1,
Slices: []Slice{{Devices: []resourceapi.Device{{
Name: deviceName,
Basic: &resourceapi.BasicDevice{
Taints: []resourceapi.DeviceTaint{
{
Effect: resourceapi.DeviceTaintEffectNoExecute,
// No time added here! Time from existing slice must get copied during update.
},
{
Key: "example.com/tainted",
Effect: resourceapi.DeviceTaintEffectNoSchedule,
TimeAdded: &timeAddedLater,
},
},
}}},
}},
},
},
},
expectedStats: Stats{
NumUpdates: 1,
},
expectedResourceSlices: []resourceapi.ResourceSlice{
*MakeResourceSlice().Name(generatedName1).GenerateName(generateName).
ResourceVersion("1").
NodeOwnerReferences(ownerName, string(nodeUID)).NodeName(ownerName).
Driver(driverName).Devices([]resourceapi.Device{{
Name: deviceName,
Basic: &resourceapi.BasicDevice{
Taints: []resourceapi.DeviceTaint{
{
Effect: resourceapi.DeviceTaintEffectNoExecute,
TimeAdded: &timeAdded,
},
{
Key: "example.com/tainted",
Effect: resourceapi.DeviceTaintEffectNoSchedule,
TimeAdded: &timeAddedLater,
},
},
}}}).
Pool(resourceapi.ResourcePool{Name: poolName, Generation: 1, ResourceSliceCount: 1}).
Obj(),
},
},
"remove-pool": {
nodeUID: nodeUID,
syncDelay: ptr.To(time.Duration(0)), // Ensure that the initial object causes an immediate sync of the pool.

View File

@ -40,6 +40,7 @@ import (
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/component-base/featuregate"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/dynamic-resource-allocation/resourceslice"
"k8s.io/klog/v2"
kubeschedulerconfigv1 "k8s.io/kube-scheduler/config/v1"
kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
@ -129,6 +130,7 @@ func TestDRA(t *testing.T) {
tCtx.Run("AdminAccess", func(tCtx ktesting.TContext) { testAdminAccess(tCtx, false) })
tCtx.Run("PrioritizedList", func(tCtx ktesting.TContext) { testPrioritizedList(tCtx, false) })
tCtx.Run("Pod", func(tCtx ktesting.TContext) { testPod(tCtx, true) })
tCtx.Run("PublishResourceSlices", func(tCtx ktesting.TContext) { testPublishResourceSlices(tCtx) })
},
},
"all": {
@ -148,6 +150,7 @@ func TestDRA(t *testing.T) {
tCtx.Run("AdminAccess", func(tCtx ktesting.TContext) { testAdminAccess(tCtx, true) })
tCtx.Run("Convert", testConvert)
tCtx.Run("PrioritizedList", func(tCtx ktesting.TContext) { testPrioritizedList(tCtx, true) })
tCtx.Run("PublishResourceSlices", func(tCtx ktesting.TContext) { testPublishResourceSlices(tCtx) })
},
},
} {
@ -308,3 +311,71 @@ func testPrioritizedList(tCtx ktesting.TContext, enabled bool) {
}).WithTimeout(time.Minute).WithPolling(time.Second).Should(schedulingAttempted)
})
}
func testPublishResourceSlices(tCtx ktesting.TContext) {
tCtx.Parallel()
driverName := "dra.example.com"
poolName := "global"
resources := &resourceslice.DriverResources{
Pools: map[string]resourceslice.Pool{
poolName: {
Slices: []resourceslice.Slice{
{
Devices: []resourceapi.Device{
{
Name: "device-simple",
Basic: &resourceapi.BasicDevice{},
},
// TODO: once https://github.com/kubernetes/kubernetes/pull/130764 is merged,
// add tests which detect dropped fields related to it.
},
},
{
Devices: []resourceapi.Device{
{
Name: "device-tainted-default",
Basic: &resourceapi.BasicDevice{
Taints: []resourceapi.DeviceTaint{{
Effect: resourceapi.DeviceTaintEffectNoExecute,
// TimeAdded is added by apiserver.
}},
},
},
{
Name: "device-tainted-time-added",
Basic: &resourceapi.BasicDevice{
Taints: []resourceapi.DeviceTaint{{
Effect: resourceapi.DeviceTaintEffectNoExecute,
TimeAdded: ptr.To(metav1.Now()),
}},
},
},
},
},
},
},
},
}
opts := resourceslice.Options{
DriverName: driverName,
KubeClient: tCtx.Client(),
SyncDelay: ptr.To(0 * time.Second),
Resources: resources,
}
controller, err := resourceslice.StartController(tCtx, opts)
tCtx.ExpectNoError(err, "start controller")
defer controller.Stop()
// Two create calls should be all that are needed.
expectedStats := resourceslice.Stats{
NumCreates: 2,
}
getStats := func(tCtx ktesting.TContext) resourceslice.Stats {
return controller.GetStats()
}
ktesting.Eventually(tCtx, getStats).WithTimeout(10 * time.Second).Should(gomega.Equal(expectedStats))
// No further changes necessary.
ktesting.Consistently(tCtx, getStats).WithTimeout(10 * time.Second).Should(gomega.Equal(expectedStats))
}