DRA: update helper packages

Publishing ResourceSlices now supports network-attached devices and the new
v1alpha3 API.  The logic for splitting up across different slices is missing.
This commit is contained in:
Patrick Ohly 2024-07-11 16:15:46 +02:00
parent 91d7882e86
commit 20f98f3a2f
5 changed files with 240 additions and 382 deletions

View File

@ -62,20 +62,6 @@ type Controller interface {
// Driver provides the actual allocation and deallocation operations.
type Driver interface {
// GetClassParameters is called to retrieve the parameter object
// referenced by a class. The content should be validated now if
// possible. class.Parameters may be nil.
//
// The caller wraps the error to include the parameter reference.
GetClassParameters(ctx context.Context, class *resourceapi.ResourceClass) (interface{}, error)
// GetClaimParameters is called to retrieve the parameter object
// referenced by a claim. The content should be validated now if
// possible. claim.Spec.Parameters may be nil.
//
// The caller wraps the error to include the parameter reference.
GetClaimParameters(ctx context.Context, claim *resourceapi.ResourceClaim, class *resourceapi.ResourceClass, classParameters interface{}) (interface{}, error)
// Allocate is called when all same-driver ResourceClaims for Pod are ready
// to be allocated. The selectedNode is empty for ResourceClaims with immediate
// allocation, in which case the resource driver decides itself where
@ -136,11 +122,9 @@ type Driver interface {
// ClaimAllocation represents information about one particular
// pod.Spec.ResourceClaim entry.
type ClaimAllocation struct {
PodClaimName string
Claim *resourceapi.ResourceClaim
Class *resourceapi.ResourceClass
ClaimParameters interface{}
ClassParameters interface{}
PodClaimName string
Claim *resourceapi.ResourceClaim
DeviceClasses map[string]*resourceapi.DeviceClass
// UnsuitableNodes needs to be filled in by the driver when
// Driver.UnsuitableNodes gets called.
@ -162,15 +146,12 @@ type controller struct {
driver Driver
setReservedFor bool
kubeClient kubernetes.Interface
claimNameLookup *resourceclaim.Lookup
queue workqueue.TypedRateLimitingInterface[string]
eventRecorder record.EventRecorder
rcLister resourcelisters.ResourceClassLister
rcSynced cache.InformerSynced
dcLister resourcelisters.DeviceClassLister
claimCache cache.MutationCache
schedulingCtxLister resourcelisters.PodSchedulingContextLister
claimSynced cache.InformerSynced
schedulingCtxSynced cache.InformerSynced
synced []cache.InformerSynced
}
// TODO: make it configurable
@ -184,10 +165,9 @@ func New(
kubeClient kubernetes.Interface,
informerFactory informers.SharedInformerFactory) Controller {
logger := klog.LoggerWithName(klog.FromContext(ctx), "resource controller")
rcInformer := informerFactory.Resource().V1alpha3().ResourceClasses()
dcInformer := informerFactory.Resource().V1alpha3().DeviceClasses()
claimInformer := informerFactory.Resource().V1alpha3().ResourceClaims()
schedulingCtxInformer := informerFactory.Resource().V1alpha3().PodSchedulingContexts()
claimNameLookup := resourceclaim.NewNameLookup(kubeClient)
eventBroadcaster := record.NewBroadcaster(record.WithContext(ctx))
go func() {
@ -228,15 +208,16 @@ func New(
driver: driver,
setReservedFor: true,
kubeClient: kubeClient,
claimNameLookup: claimNameLookup,
rcLister: rcInformer.Lister(),
rcSynced: rcInformer.Informer().HasSynced,
dcLister: dcInformer.Lister(),
claimCache: claimCache,
claimSynced: claimInformer.Informer().HasSynced,
schedulingCtxLister: schedulingCtxInformer.Lister(),
schedulingCtxSynced: schedulingCtxInformer.Informer().HasSynced,
queue: queue,
eventRecorder: eventRecorder,
synced: []cache.InformerSynced{
dcInformer.Informer().HasSynced,
claimInformer.Informer().HasSynced,
schedulingCtxInformer.Informer().HasSynced,
},
}
loggerV6 := logger.V(6)
@ -341,7 +322,7 @@ func (ctrl *controller) Run(workers int) {
stopCh := ctrl.ctx.Done()
if !cache.WaitForCacheSync(stopCh, ctrl.rcSynced, ctrl.claimSynced, ctrl.schedulingCtxSynced) {
if !cache.WaitForCacheSync(stopCh, ctrl.synced...) {
ctrl.logger.Error(nil, "Cannot sync caches")
return
}
@ -471,20 +452,19 @@ func (ctrl *controller) syncClaim(ctx context.Context, claim *resourceapi.Resour
if claim.Status.Allocation != nil {
// Allocation was completed. Deallocate before proceeding.
if err := ctrl.driver.Deallocate(ctx, claim); err != nil {
return fmt.Errorf("deallocate: %v", err)
return fmt.Errorf("deallocate: %w", err)
}
claim.Status.Allocation = nil
claim.Status.DriverName = ""
claim.Status.DeallocationRequested = false
claim, err = ctrl.kubeClient.ResourceV1alpha3().ResourceClaims(claim.Namespace).UpdateStatus(ctx, claim, metav1.UpdateOptions{})
if err != nil {
return fmt.Errorf("remove allocation: %v", err)
return fmt.Errorf("remove allocation: %w", err)
}
ctrl.claimCache.Mutation(claim)
} else {
// Ensure that there is no on-going allocation.
if err := ctrl.driver.Deallocate(ctx, claim); err != nil {
return fmt.Errorf("stop allocation: %v", err)
return fmt.Errorf("stop allocation: %w", err)
}
}
@ -493,7 +473,7 @@ func (ctrl *controller) syncClaim(ctx context.Context, claim *resourceapi.Resour
claim.Status.DeallocationRequested = false
claim, err = ctrl.kubeClient.ResourceV1alpha3().ResourceClaims(claim.Namespace).UpdateStatus(ctx, claim, metav1.UpdateOptions{})
if err != nil {
return fmt.Errorf("remove deallocation: %v", err)
return fmt.Errorf("remove deallocation: %w", err)
}
ctrl.claimCache.Mutation(claim)
}
@ -501,7 +481,7 @@ func (ctrl *controller) syncClaim(ctx context.Context, claim *resourceapi.Resour
claim.Finalizers = ctrl.removeFinalizer(claim.Finalizers)
claim, err = ctrl.kubeClient.ResourceV1alpha3().ResourceClaims(claim.Namespace).Update(ctx, claim, metav1.UpdateOptions{})
if err != nil {
return fmt.Errorf("remove finalizer: %v", err)
return fmt.Errorf("remove finalizer: %w", err)
}
ctrl.claimCache.Mutation(claim)
}
@ -519,24 +499,6 @@ func (ctrl *controller) syncClaim(ctx context.Context, claim *resourceapi.Resour
return nil
}
func (ctrl *controller) getParameters(ctx context.Context, claim *resourceapi.ResourceClaim, class *resourceapi.ResourceClass, notifyClaim bool) (claimParameters, classParameters interface{}, err error) {
classParameters, err = ctrl.driver.GetClassParameters(ctx, class)
if err != nil {
ctrl.eventRecorder.Event(class, v1.EventTypeWarning, "Failed", err.Error())
err = fmt.Errorf("class parameters %s: %v", class.ParametersRef, err)
return
}
claimParameters, err = ctrl.driver.GetClaimParameters(ctx, claim, class, classParameters)
if err != nil {
if notifyClaim {
ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, "Failed", err.Error())
}
err = fmt.Errorf("claim parameters %s: %v", claim.Spec.ParametersRef, err)
return
}
return
}
// allocateClaims filters list of claims, keeps those needing allocation and asks driver to do the allocations.
// Driver is supposed to write the AllocationResult and Error field into argument claims slice.
func (ctrl *controller) allocateClaims(ctx context.Context, claims []*ClaimAllocation, selectedNode string, selectedUser *resourceapi.ResourceClaimConsumerReference) {
@ -572,7 +534,7 @@ func (ctrl *controller) allocateClaims(ctx context.Context, claims []*ClaimAlloc
claim, err = ctrl.kubeClient.ResourceV1alpha3().ResourceClaims(claim.Namespace).Update(ctx, claim, metav1.UpdateOptions{})
if err != nil {
logger.Error(err, "add finalizer", "claim", claim.Name)
claimAllocation.Error = fmt.Errorf("add finalizer: %v", err)
claimAllocation.Error = fmt.Errorf("add finalizer: %w", err)
// Do not save claim to ask for Allocate from Driver.
continue
}
@ -602,14 +564,14 @@ func (ctrl *controller) allocateClaims(ctx context.Context, claims []*ClaimAlloc
logger.V(5).Info("successfully allocated", "claim", klog.KObj(claimAllocation.Claim))
claim := claimAllocation.Claim.DeepCopy()
claim.Status.Allocation = claimAllocation.Allocation
claim.Status.DriverName = ctrl.name
claim.Status.Allocation.Controller = ctrl.name
if selectedUser != nil && ctrl.setReservedFor {
claim.Status.ReservedFor = append(claim.Status.ReservedFor, *selectedUser)
}
logger.V(6).Info("Updating claim after allocation", "claim", claim)
claim, err := ctrl.kubeClient.ResourceV1alpha3().ResourceClaims(claim.Namespace).UpdateStatus(ctx, claim, metav1.UpdateOptions{})
if err != nil {
claimAllocation.Error = fmt.Errorf("add allocation: %v", err)
claimAllocation.Error = fmt.Errorf("add allocation: %w", err)
continue
}
@ -619,7 +581,7 @@ func (ctrl *controller) allocateClaims(ctx context.Context, claims []*ClaimAlloc
}
func (ctrl *controller) checkPodClaim(ctx context.Context, pod *v1.Pod, podClaim v1.PodResourceClaim) (*ClaimAllocation, error) {
claimName, mustCheckOwner, err := ctrl.claimNameLookup.Name(pod, &podClaim)
claimName, mustCheckOwner, err := resourceclaim.Name(pod, &podClaim)
if err != nil {
return nil, err
}
@ -642,26 +604,30 @@ func (ctrl *controller) checkPodClaim(ctx context.Context, pod *v1.Pod, podClaim
// need to be done for the claim either.
return nil, nil
}
class, err := ctrl.rcLister.Get(claim.Spec.ResourceClassName)
if err != nil {
return nil, err
}
if class.DriverName != ctrl.name {
if claim.Spec.Controller != ctrl.name {
return nil, nil
}
// Check parameters. Record event to claim and pod if parameters are invalid.
claimParameters, classParameters, err := ctrl.getParameters(ctx, claim, class, true)
if err != nil {
ctrl.eventRecorder.Event(pod, v1.EventTypeWarning, "Failed", fmt.Sprintf("claim %v: %v", claim.Name, err.Error()))
return nil, err
// Sanity checks and preparations...
ca := &ClaimAllocation{
PodClaimName: podClaim.Name,
Claim: claim,
DeviceClasses: make(map[string]*resourceapi.DeviceClass),
}
return &ClaimAllocation{
PodClaimName: podClaim.Name,
Claim: claim,
Class: class,
ClaimParameters: claimParameters,
ClassParameters: classParameters,
}, nil
for _, request := range claim.Spec.Devices.Requests {
if request.DeviceClassName == "" {
// Some unknown request. Abort!
return nil, fmt.Errorf("claim %s: unknown request type in request %s", klog.KObj(claim), request.Name)
}
deviceClassName := request.DeviceClassName
class, err := ctrl.dcLister.Get(deviceClassName)
if err != nil {
return nil, fmt.Errorf("claim %s: request %s: class %s: %w", klog.KObj(claim), request.Name, deviceClassName, err)
}
ca.DeviceClasses[deviceClassName] = class
}
return ca, nil
}
// syncPodSchedulingContext determines which next action may be needed for a PodSchedulingContext object
@ -709,7 +675,7 @@ func (ctrl *controller) syncPodSchedulingContexts(ctx context.Context, schedulin
for _, podClaim := range pod.Spec.ResourceClaims {
delayed, err := ctrl.checkPodClaim(ctx, pod, podClaim)
if err != nil {
return fmt.Errorf("pod claim %s: %v", podClaim.Name, err)
return fmt.Errorf("pod claim %s: %w", podClaim.Name, err)
}
if delayed == nil {
// Nothing to do for it. This can change, so keep checking.
@ -739,7 +705,7 @@ func (ctrl *controller) syncPodSchedulingContexts(ctx context.Context, schedulin
}
if len(schedulingCtx.Spec.PotentialNodes) > 0 {
if err := ctrl.driver.UnsuitableNodes(ctx, pod, claims, potentialNodes); err != nil {
return fmt.Errorf("checking potential nodes: %v", err)
return fmt.Errorf("checking potential nodes: %w", err)
}
}
logger.V(5).Info("pending pod claims", "claims", claims, "selectedNode", selectedNode)
@ -772,7 +738,7 @@ func (ctrl *controller) syncPodSchedulingContexts(ctx context.Context, schedulin
allErrors = append(allErrors, delayed.Error)
} else {
// Include claim name, it's not in the underlying error.
allErrors = append(allErrors, fmt.Errorf("claim %s: %v", delayed.Claim.Name, delayed.Error))
allErrors = append(allErrors, fmt.Errorf("claim %s: %w", delayed.Claim.Name, delayed.Error))
}
}
}
@ -807,7 +773,7 @@ func (ctrl *controller) syncPodSchedulingContexts(ctx context.Context, schedulin
if modified {
logger.V(6).Info("Updating pod scheduling with modified unsuitable nodes", "podSchedulingCtx", schedulingCtx)
if _, err := ctrl.kubeClient.ResourceV1alpha3().PodSchedulingContexts(schedulingCtx.Namespace).UpdateStatus(ctx, schedulingCtx, metav1.UpdateOptions{}); err != nil {
return fmt.Errorf("update unsuitable node status: %v", err)
return fmt.Errorf("update unsuitable node status: %w", err)
}
}

View File

@ -44,15 +44,10 @@ func TestController(t *testing.T) {
driverName := "mock-driver"
className := "mock-class"
otherDriverName := "other-driver"
otherClassName := "other-class"
ourFinalizer := driverName + "/deletion-protection"
otherFinalizer := otherDriverName + "/deletion-protection"
classes := []*resourceapi.ResourceClass{
createClass(className, driverName),
createClass(otherClassName, otherDriverName),
}
claim := createClaim(claimName, claimNamespace, className)
otherClaim := createClaim(claimName, claimNamespace, otherClassName)
claim := createClaim(claimName, claimNamespace, driverName)
otherClaim := createClaim(claimName, claimNamespace, otherDriverName)
podName := "pod"
podKey := "schedulingCtx:default/pod"
pod := createPod(podName, claimNamespace, nil)
@ -87,12 +82,11 @@ func TestController(t *testing.T) {
claim.Finalizers = append(claim.Finalizers, finalizer)
return claim
}
allocation := resourceapi.AllocationResult{}
allocation := resourceapi.AllocationResult{Controller: driverName}
withAllocate := func(claim *resourceapi.ResourceClaim) *resourceapi.ResourceClaim {
// Any allocated claim must have our finalizer.
claim = withFinalizer(claim, ourFinalizer)
claim.Status.Allocation = &allocation
claim.Status.DriverName = driverName
return claim
}
withDeallocate := func(claim *resourceapi.ResourceClaim) *resourceapi.ResourceClaim {
@ -128,7 +122,6 @@ func TestController(t *testing.T) {
for name, test := range map[string]struct {
key string
driver mockDriver
classes []*resourceapi.ResourceClass
pod *corev1.Pod
schedulingCtx, expectedSchedulingCtx *resourceapi.PodSchedulingContext
claim, expectedClaim *resourceapi.ResourceClaim
@ -143,7 +136,6 @@ func TestController(t *testing.T) {
},
"wrong-driver": {
key: claimKey,
classes: classes,
claim: otherClaim,
expectedClaim: otherClaim,
},
@ -151,7 +143,6 @@ func TestController(t *testing.T) {
// not deleted, reallocate -> deallocate
"immediate-allocated-reallocate": {
key: claimKey,
classes: classes,
claim: withDeallocate(withAllocate(claim)),
driver: m.expectDeallocate(map[string]error{claimName: nil}),
expectedClaim: claim,
@ -160,7 +151,6 @@ func TestController(t *testing.T) {
// not deleted, reallocate, deallocate failure -> requeue
"immediate-allocated-fail-deallocation-during-reallocate": {
key: claimKey,
classes: classes,
claim: withDeallocate(withAllocate(claim)),
driver: m.expectDeallocate(map[string]error{claimName: errors.New("fake error")}),
expectedClaim: withDeallocate(withAllocate(claim)),
@ -170,7 +160,6 @@ func TestController(t *testing.T) {
// deletion time stamp set, our finalizer set, not allocated -> remove finalizer
"deleted-finalizer-removal": {
key: claimKey,
classes: classes,
claim: withFinalizer(withDeletionTimestamp(claim), ourFinalizer),
driver: m.expectDeallocate(map[string]error{claimName: nil}),
expectedClaim: withDeletionTimestamp(claim),
@ -178,7 +167,6 @@ func TestController(t *testing.T) {
// deletion time stamp set, our finalizer set, not allocated, stopping fails -> requeue
"deleted-finalizer-stop-failure": {
key: claimKey,
classes: classes,
claim: withFinalizer(withDeletionTimestamp(claim), ourFinalizer),
driver: m.expectDeallocate(map[string]error{claimName: errors.New("fake error")}),
expectedClaim: withFinalizer(withDeletionTimestamp(claim), ourFinalizer),
@ -187,14 +175,12 @@ func TestController(t *testing.T) {
// deletion time stamp set, other finalizer set, not allocated -> do nothing
"deleted-finalizer-no-removal": {
key: claimKey,
classes: classes,
claim: withFinalizer(withDeletionTimestamp(claim), otherFinalizer),
expectedClaim: withFinalizer(withDeletionTimestamp(claim), otherFinalizer),
},
// deletion time stamp set, finalizer set, allocated -> deallocate
"deleted-allocated": {
key: claimKey,
classes: classes,
claim: withAllocate(withDeletionTimestamp(claim)),
driver: m.expectDeallocate(map[string]error{claimName: nil}),
expectedClaim: withDeletionTimestamp(claim),
@ -202,7 +188,6 @@ func TestController(t *testing.T) {
// deletion time stamp set, finalizer set, allocated, deallocation fails -> requeue
"deleted-deallocate-failure": {
key: claimKey,
classes: classes,
claim: withAllocate(withDeletionTimestamp(claim)),
driver: m.expectDeallocate(map[string]error{claimName: errors.New("fake error")}),
expectedClaim: withAllocate(withDeletionTimestamp(claim)),
@ -211,14 +196,12 @@ func TestController(t *testing.T) {
// deletion time stamp set, finalizer not set -> do nothing
"deleted-no-finalizer": {
key: claimKey,
classes: classes,
claim: withDeletionTimestamp(claim),
expectedClaim: withDeletionTimestamp(claim),
},
// waiting for first consumer -> do nothing
"pending": {
key: claimKey,
classes: classes,
claim: claim,
expectedClaim: claim,
},
@ -235,7 +218,6 @@ func TestController(t *testing.T) {
// no potential nodes -> shouldn't occur
"no-nodes": {
key: podKey,
classes: classes,
claim: claim,
expectedClaim: claim,
pod: podWithClaim,
@ -246,7 +228,6 @@ func TestController(t *testing.T) {
// potential nodes -> provide unsuitable nodes
"info": {
key: podKey,
classes: classes,
claim: claim,
expectedClaim: claim,
pod: podWithClaim,
@ -258,21 +239,9 @@ func TestController(t *testing.T) {
expectedError: errPeriodic.Error(),
},
// potential nodes, selected node, missing class -> failure
"missing-class": {
key: podKey,
claim: claim,
expectedClaim: claim,
pod: podWithClaim,
schedulingCtx: withSelectedNode(withPotentialNodes(podSchedulingCtx)),
expectedSchedulingCtx: withSelectedNode(withPotentialNodes(podSchedulingCtx)),
expectedError: `pod claim my-pod-claim: resourceclass.resource.k8s.io "mock-class" not found`,
},
// potential nodes, selected node -> allocate
"allocate": {
key: podKey,
classes: classes,
claim: claim,
expectedClaim: withReservedFor(withAllocate(claim), pod),
pod: podWithClaim,
@ -287,7 +256,6 @@ func TestController(t *testing.T) {
// potential nodes, selected node, all unsuitable -> update unsuitable nodes
"is-potential-node": {
key: podKey,
classes: classes,
claim: claim,
expectedClaim: claim,
pod: podWithClaim,
@ -301,7 +269,6 @@ func TestController(t *testing.T) {
// max potential nodes, other selected node, all unsuitable -> update unsuitable nodes with truncation at start
"is-potential-node-truncate-first": {
key: podKey,
classes: classes,
claim: claim,
expectedClaim: claim,
pod: podWithClaim,
@ -315,7 +282,6 @@ func TestController(t *testing.T) {
// max potential nodes, other selected node, all unsuitable (but in reverse order) -> update unsuitable nodes with truncation at end
"pod-selected-is-potential-node-truncate-last": {
key: podKey,
classes: classes,
claim: claim,
expectedClaim: claim,
pod: podWithClaim,
@ -332,9 +298,6 @@ func TestController(t *testing.T) {
ctx, cancel := context.WithCancel(ctx)
initialObjects := []runtime.Object{}
for _, class := range test.classes {
initialObjects = append(initialObjects, class)
}
if test.pod != nil {
initialObjects = append(initialObjects, test.pod)
}
@ -345,7 +308,6 @@ func TestController(t *testing.T) {
initialObjects = append(initialObjects, test.claim)
}
kubeClient, informerFactory := fakeK8s(initialObjects)
rcInformer := informerFactory.Resource().V1alpha3().ResourceClasses()
claimInformer := informerFactory.Resource().V1alpha3().ResourceClaims()
podInformer := informerFactory.Core().V1().Pods()
podSchedulingInformer := informerFactory.Resource().V1alpha3().PodSchedulingContexts()
@ -356,8 +318,6 @@ func TestController(t *testing.T) {
for _, obj := range initialObjects {
switch obj.(type) {
case *resourceapi.ResourceClass:
require.NoError(t, rcInformer.Informer().GetStore().Add(obj), "add resource class")
case *resourceapi.ResourceClaim:
require.NoError(t, claimInformer.Informer().GetStore().Add(obj), "add resource claim")
case *corev1.Pod:
@ -375,7 +335,6 @@ func TestController(t *testing.T) {
ctrl := New(ctx, driverName, driver, kubeClient, informerFactory)
informerFactory.Start(ctx.Done())
if !cache.WaitForCacheSync(ctx.Done(),
informerFactory.Resource().V1alpha3().ResourceClasses().Informer().HasSynced,
informerFactory.Resource().V1alpha3().ResourceClaims().Informer().HasSynced,
informerFactory.Resource().V1alpha3().PodSchedulingContexts().Informer().HasSynced,
) {
@ -459,30 +418,6 @@ func (m mockDriver) expectUnsuitableNodes(expected map[string][]string, err erro
return m
}
func (m mockDriver) GetClassParameters(ctx context.Context, class *resourceapi.ResourceClass) (interface{}, error) {
m.t.Logf("GetClassParameters(%s)", class)
result, ok := m.classParameters[class.Name]
if !ok {
m.t.Fatal("unexpected GetClassParameters call")
}
if err, ok := result.(error); ok {
return nil, err
}
return result, nil
}
func (m mockDriver) GetClaimParameters(ctx context.Context, claim *resourceapi.ResourceClaim, class *resourceapi.ResourceClass, classParameters interface{}) (interface{}, error) {
m.t.Logf("GetClaimParameters(%s)", claim)
result, ok := m.claimParameters[claim.Name]
if !ok {
m.t.Fatal("unexpected GetClaimParameters call")
}
if err, ok := result.(error); ok {
return nil, err
}
return result, nil
}
func (m mockDriver) Allocate(ctx context.Context, claims []*ClaimAllocation, selectedNode string) {
m.t.Logf("Allocate(number of claims %d)", len(claims))
for _, claimAllocation := range claims {
@ -532,23 +467,14 @@ func (m mockDriver) UnsuitableNodes(ctx context.Context, pod *corev1.Pod, claims
return nil
}
func createClass(className, driverName string) *resourceapi.ResourceClass {
return &resourceapi.ResourceClass{
ObjectMeta: metav1.ObjectMeta{
Name: className,
},
DriverName: driverName,
}
}
func createClaim(claimName, claimNamespace, className string) *resourceapi.ResourceClaim {
func createClaim(claimName, claimNamespace, driverName string) *resourceapi.ResourceClaim {
return &resourceapi.ResourceClaim{
ObjectMeta: metav1.ObjectMeta{
Name: claimName,
Namespace: claimNamespace,
},
Spec: resourceapi.ResourceClaimSpec{
ResourceClassName: className,
Controller: driverName,
},
}
}

View File

@ -54,8 +54,8 @@ type DRAPlugin interface {
// after it returns before all information is actually written
// to the API server.
//
// The caller must not modify the content of the slice.
PublishResources(ctx context.Context, nodeResources []*resourceapi.ResourceModel)
// The caller must not modify the content after the call.
PublishResources(ctx context.Context, resources Resources)
// This unexported method ensures that we can modify the interface
// without causing an API break of the package
@ -63,6 +63,12 @@ type DRAPlugin interface {
internal()
}
// Resources currently only supports devices. Might get extended in the
// future.
type Resources struct {
Devices []resourceapi.Device
}
// Option implements the functional options pattern for Start.
type Option func(o *options) error
@ -360,7 +366,7 @@ func (d *draPlugin) Stop() {
}
// PublishResources implements [DRAPlugin.PublishResources].
func (d *draPlugin) PublishResources(ctx context.Context, nodeResources []*resourceapi.ResourceModel) {
func (d *draPlugin) PublishResources(ctx context.Context, resources Resources) {
d.mutex.Lock()
defer d.mutex.Unlock()
@ -370,7 +376,13 @@ func (d *draPlugin) PublishResources(ctx context.Context, nodeResources []*resou
Name: d.nodeName,
UID: d.nodeUID, // Optional, will be determined by controller if empty.
}
resources := &resourceslice.Resources{NodeResources: nodeResources}
driverResources := &resourceslice.DriverResources{
Pools: map[string]resourceslice.Pool{
d.nodeName: {
Devices: resources.Devices,
},
},
}
if d.resourceSliceController == nil {
// Start publishing the information. The controller is using
// our background context, not the one passed into this
@ -380,12 +392,12 @@ func (d *draPlugin) PublishResources(ctx context.Context, nodeResources []*resou
controllerLogger := klog.FromContext(controllerCtx)
controllerLogger = klog.LoggerWithName(controllerLogger, "ResourceSlice controller")
controllerCtx = klog.NewContext(controllerCtx, controllerLogger)
d.resourceSliceController = resourceslice.StartController(controllerCtx, d.kubeClient, d.driverName, owner, resources)
d.resourceSliceController = resourceslice.StartController(controllerCtx, d.kubeClient, d.driverName, owner, driverResources)
return
}
// Inform running controller about new information.
d.resourceSliceController.Update(resources)
d.resourceSliceController.Update(driverResources)
}
// RegistrationStatus implements [DRAPlugin.RegistrationStatus].

View File

@ -26,15 +26,10 @@ package resourceclaim
import (
"errors"
"fmt"
"os"
"strings"
"sync/atomic"
v1 "k8s.io/api/core/v1"
resourceapi "k8s.io/api/resource/v1alpha3"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/utils/ptr"
)
var (
@ -81,78 +76,6 @@ func Name(pod *v1.Pod, podClaim *v1.PodResourceClaim) (name *string, mustCheckOw
}
}
// NewNameLookup returns an object which handles determining the name of
// a ResourceClaim. In contrast to the stand-alone Name it is compatible
// also with Kubernetes < 1.28.
//
// Providing a client is optional. If none is available, then code can pass nil
// and users can set the DRA_WITH_DETERMINISTIC_RESOURCE_CLAIM_NAMES env
// variable to an arbitrary non-empty value to use the naming from Kubernetes <
// 1.28.
func NewNameLookup(client kubernetes.Interface) *Lookup {
return &Lookup{client: client}
}
// Lookup stores the state which is necessary to look up ResourceClaim names.
type Lookup struct {
client kubernetes.Interface
usePodStatus atomic.Pointer[bool]
}
// Name is a variant of the stand-alone Name with support also for Kubernetes < 1.28.
func (l *Lookup) Name(pod *v1.Pod, podClaim *v1.PodResourceClaim) (name *string, mustCheckOwner bool, err error) {
usePodStatus := l.usePodStatus.Load()
if usePodStatus == nil {
if value, _ := os.LookupEnv("DRA_WITH_DETERMINISTIC_RESOURCE_CLAIM_NAMES"); value != "" {
usePodStatus = ptr.To(false)
} else if l.client != nil {
// Check once. This does not detect upgrades or
// downgrades, but that is good enough for the simple
// test scenarios that the Kubernetes < 1.28 support is
// meant for.
info, err := l.client.Discovery().ServerVersion()
if err != nil {
return nil, false, fmt.Errorf("look up server version: %v", err)
}
if info.Major == "" {
// Fake client...
usePodStatus = ptr.To(true)
} else {
switch strings.Compare(info.Major, "1") {
case -1:
// Huh?
usePodStatus = ptr.To(false)
case 0:
// info.Minor may have a suffix which makes it larger than 28.
// We don't care about pre-releases here.
usePodStatus = ptr.To(strings.Compare("28", info.Minor) <= 0)
case 1:
// Kubernetes 2? Yeah!
usePodStatus = ptr.To(true)
}
}
} else {
// No information. Let's assume recent Kubernetes.
usePodStatus = ptr.To(true)
}
l.usePodStatus.Store(usePodStatus)
}
if *usePodStatus {
return Name(pod, podClaim)
}
switch {
case podClaim.ResourceClaimName != nil:
return podClaim.ResourceClaimName, false, nil
case podClaim.ResourceClaimTemplateName != nil:
name := pod.Name + "-" + podClaim.Name
return &name, true, nil
default:
return nil, false, fmt.Errorf(`pod "%s/%s", spec.resourceClaim %q: %w`, pod.Namespace, pod.Name, podClaim.Name, ErrAPIUnsupported)
}
}
// IsForPod checks that the ResourceClaim is the one that
// was created for the Pod. It returns an error that is informative
// enough to be returned by the caller without adding further details
@ -183,9 +106,3 @@ func CanBeReserved(claim *resourceapi.ResourceClaim) bool {
// Currently no restrictions on sharing...
return true
}
// IsAllocatedWithStructuredParameters checks whether the claim is allocated
// and the allocation was done with structured parameters.
func IsAllocatedWithStructuredParameters(claim *resourceapi.ResourceClaim) bool {
return claim.Status.Allocation != nil && claim.Status.Allocation.Controller == ""
}

View File

@ -25,13 +25,13 @@ import (
"github.com/google/go-cmp/cmp"
"k8s.io/api/core/v1"
resourceapi "k8s.io/api/resource/v1alpha3"
apiequality "k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
resourceinformers "k8s.io/client-go/informers/resource/v1alpha3"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
@ -46,18 +46,17 @@ const (
resyncPeriod = time.Duration(10 * time.Minute)
)
// Controller synchronizes information about resources of one
// driver with ResourceSlice objects. It currently supports node-local
// Controller synchronizes information about resources of one driver with
// ResourceSlice objects. It supports node-local and network-attached
// resources. A DRA driver for node-local resources typically runs this
// controller as part of its kubelet plugin.
//
// Support for network-attached resources will be added later.
type Controller struct {
cancel func(cause error)
driverName string
driver string
owner Owner
kubeClient kubernetes.Interface
wg sync.WaitGroup
// The queue is keyed with the pool name that needs work.
queue workqueue.TypedRateLimitingInterface[string]
sliceStore cache.Store
@ -66,15 +65,32 @@ type Controller struct {
// When receiving updates from the driver, the entire pointer replaced,
// so it is okay to not do a deep copy of it when reading it. Only reading
// the pointer itself must be protected by a read lock.
resources *Resources
resources *DriverResources
}
// Resources is a complete description of all resources synchronized by the controller.
type Resources struct {
// NodeResources are resources that are local to one node.
NodeResources []*resourceapi.ResourceModel
// DriverResources is a complete description of all resources synchronized by the controller.
type DriverResources struct {
// Each driver may manage different resource pools.
Pools map[string]Pool
}
// Pool is the collection of devices belonging to the same pool.
type Pool struct {
// NodeSelector may be different for each pool. Must not get set together
// with Resources.NodeName. It nil and Resources.NodeName is not set,
// then devices are available on all nodes.
NodeSelector *v1.NodeSelector
// Generation can be left at zero. It gets bumped up automatically
// by the controller.
Generation int64
// Device names must be unique inside the pool.
Devices []resourceapi.Device
}
// Owner is the resource which is meant to be listed as owner of the resource slices.
// For a node the UID may be left blank. The controller will look it up automatically.
type Owner struct {
APIVersion string
Kind string
@ -93,7 +109,7 @@ type Owner struct {
// the controller is inactive. This can happen when kubelet is run stand-alone
// without an apiserver. In that case we can't and don't need to publish
// ResourceSlices.
func StartController(ctx context.Context, kubeClient kubernetes.Interface, driverName string, owner Owner, resources *Resources) *Controller {
func StartController(ctx context.Context, kubeClient kubernetes.Interface, driver string, owner Owner, resources *DriverResources) *Controller {
if kubeClient == nil {
return nil
}
@ -104,7 +120,7 @@ func StartController(ctx context.Context, kubeClient kubernetes.Interface, drive
c := &Controller{
cancel: cancel,
kubeClient: kubeClient,
driverName: driverName,
driver: driver,
owner: owner,
queue: workqueue.NewTypedRateLimitingQueueWithConfig(
workqueue.DefaultTypedControllerRateLimiter[string](),
@ -121,8 +137,10 @@ func StartController(ctx context.Context, kubeClient kubernetes.Interface, drive
c.run(ctx)
}()
// Sync once.
c.queue.Add("")
// Sync each pool once.
for poolName := range resources.Pools {
c.queue.Add(poolName)
}
return c
}
@ -137,12 +155,24 @@ func (c *Controller) Stop() {
}
// Update sets the new desired state of the resource information.
func (c *Controller) Update(resources *Resources) {
//
// The controller takes over ownership, so these resources must
// not get modified after this method returns.
func (c *Controller) Update(resources *DriverResources) {
c.mutex.Lock()
defer c.mutex.Unlock()
// Sync all old pools..
for poolName := range c.resources.Pools {
c.queue.Add(poolName)
}
c.resources = resources
c.queue.Add("")
// ... and the new ones (might be the same).
for poolName := range c.resources.Pools {
c.queue.Add(poolName)
}
}
// run is running in the background. It handles blocking initialization (like
@ -151,9 +181,9 @@ func (c *Controller) run(ctx context.Context) {
logger := klog.FromContext(ctx)
// We always filter by driver name, by node name only for node-local resources.
selector := fields.Set{"driverName": c.driverName}
selector := fields.Set{resourceapi.ResourceSliceSelectorDriver: c.driver}
if c.owner.APIVersion == "v1" && c.owner.Kind == "Node" {
selector["nodeName"] = c.owner.Name
selector[resourceapi.ResourceSliceSelectorNodeName] = c.owner.Name
}
informer := resourceinformers.NewFilteredResourceSliceInformer(c.kubeClient, resyncPeriod, nil, func(options *metav1.ListOptions) {
options.FieldSelector = selector.String()
@ -166,7 +196,7 @@ func (c *Controller) run(ctx context.Context) {
return
}
logger.V(5).Info("ResourceSlice add", "slice", klog.KObj(slice))
c.queue.Add("")
c.queue.Add(slice.Spec.Pool.Name)
},
UpdateFunc: func(old, new any) {
oldSlice, ok := old.(*resourceapi.ResourceSlice)
@ -182,7 +212,8 @@ func (c *Controller) run(ctx context.Context) {
} else {
logger.V(5).Info("ResourceSlice update", "slice", klog.KObj(newSlice))
}
c.queue.Add("")
c.queue.Add(oldSlice.Spec.Pool.Name)
c.queue.Add(newSlice.Spec.Pool.Name)
},
DeleteFunc: func(obj any) {
if tombstone, ok := obj.(cache.DeletedFinalStateUnknown); ok {
@ -193,7 +224,7 @@ func (c *Controller) run(ctx context.Context) {
return
}
logger.V(5).Info("ResourceSlice delete", "slice", klog.KObj(slice))
c.queue.Add("")
c.queue.Add(slice.Spec.Pool.Name)
},
})
if err != nil {
@ -219,16 +250,19 @@ func (c *Controller) run(ctx context.Context) {
}
logger.V(3).Info("ResourceSlice informer has synced")
// Seed the
for c.processNextWorkItem(ctx) {
}
}
func (c *Controller) processNextWorkItem(ctx context.Context) bool {
key, shutdown := c.queue.Get()
poolName, shutdown := c.queue.Get()
if shutdown {
return false
}
defer c.queue.Done(key)
defer c.queue.Done(poolName)
logger := klog.FromContext(ctx)
// Panics are caught and treated like errors.
var err error
@ -238,154 +272,157 @@ func (c *Controller) processNextWorkItem(ctx context.Context) bool {
err = fmt.Errorf("internal error: %v", r)
}
}()
err = c.sync(ctx)
err = c.syncPool(klog.NewContext(ctx, klog.LoggerWithValues(logger, "poolName", poolName)), poolName)
}()
if err != nil {
utilruntime.HandleErrorWithContext(ctx, err, "processing ResourceSlice objects")
c.queue.AddRateLimited(key)
c.queue.AddRateLimited(poolName)
// Return without removing the work item from the queue.
// It will be retried.
return true
}
c.queue.Forget(key)
c.queue.Forget(poolName)
return true
}
func (c *Controller) sync(ctx context.Context) error {
// syncPool processes one pool. Only runs inside a single worker, so there
// is no need for locking except when accessing c.resources, which may
// be updated at any time by the user of the controller.
func (c *Controller) syncPool(ctx context.Context, poolName string) error {
logger := klog.FromContext(ctx)
// Gather information about the actual and desired state.
slices := c.sliceStore.List()
var resources *Resources
// TODO: index by pool name.
var slices []*resourceapi.ResourceSlice
for _, obj := range c.sliceStore.List() {
if slice, ok := obj.(*resourceapi.ResourceSlice); ok && slice.Spec.Pool.Name == poolName {
slices = append(slices, slice)
}
}
var resources *DriverResources
c.mutex.RLock()
resources = c.resources
c.mutex.RUnlock()
// Resources that are not yet stored in any slice need to be published.
// Here we track the indices of any resources that are already stored.
storedResourceIndices := sets.New[int]()
// Retrieve node object to get UID?
// The result gets cached and is expected to not change while
// the controller runs.
var nodeName string
if c.owner.APIVersion == "v1" && c.owner.Kind == "Node" {
nodeName = c.owner.Name
if c.owner.UID == "" {
node, err := c.kubeClient.CoreV1().Nodes().Get(ctx, c.owner.Name, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("retrieve node %q: %w", c.owner.Name, err)
}
// There is only one worker, so no locking needed.
c.owner.UID = node.UID
}
}
// Slices that don't match any driver resource can either be updated (if there
// are new driver resources that need to be stored) or they need to be deleted.
obsoleteSlices := make([]*resourceapi.ResourceSlice, 0, len(slices))
// Match slices with resource information.
for _, obj := range slices {
slice := obj.(*resourceapi.ResourceSlice)
// TODO: network-attached resources.
index := indexOfModel(resources.NodeResources, &slice.ResourceModel)
if index >= 0 {
storedResourceIndices.Insert(index)
continue
// Determine highest generation.
var generation int64
for _, slice := range slices {
if slice.Spec.Pool.Generation > generation {
generation = slice.Spec.Pool.Generation
}
obsoleteSlices = append(obsoleteSlices, slice)
}
if loggerV := logger.V(6); loggerV.Enabled() {
// Dump entire resource information.
loggerV.Info("Syncing existing driver resource slices with driver resources", "slices", klog.KObjSlice(slices), "resources", resources)
} else {
logger.V(5).Info("Syncing existing driver resource slices with driver resources", "slices", klog.KObjSlice(slices), "numResources", len(resources.NodeResources))
}
// Retrieve node object to get UID?
// The result gets cached and is expected to not change while
// the controller runs.
if c.owner.UID == "" && c.owner.APIVersion == "v1" && c.owner.Kind == "Node" {
node, err := c.kubeClient.CoreV1().Nodes().Get(ctx, c.owner.Name, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("retrieve node %q: %w", c.owner.Name, err)
// Everything older is obsolete.
currentSlices := make([]*resourceapi.ResourceSlice, 0, len(slices))
for _, slice := range slices {
if slice.Spec.Pool.Generation < generation {
obsoleteSlices = append(obsoleteSlices, slice)
} else {
currentSlices = append(currentSlices, slice)
}
// There is only one worker, so no locking needed.
c.owner.UID = node.UID
}
slices = currentSlices
// Update stale slices before removing what's left.
//
// We don't really know which of these slices might have
// been used for "the" driver resource because they don't
// have a unique ID. In practice, a driver is most likely
// to just give us one ResourceModel, in which case
// this isn't a problem at all. If we have more than one,
// then at least conceptually it currently doesn't matter
// where we publish it.
//
// The long-term goal is to move the handling of
// ResourceSlice objects into the driver, with kubelet
// just acting as a REST proxy. The advantage of that will
// be that kubelet won't need to support the same
// resource API version as the driver and the control plane.
// With that approach, the driver will be able to match
// up objects more intelligently.
numObsoleteSlices := len(obsoleteSlices)
for index, resource := range resources.NodeResources {
if storedResourceIndices.Has(index) {
// No need to do anything, it is already stored exactly
// like this in an existing slice.
continue
if pool, ok := resources.Pools[poolName]; ok {
if pool.Generation > generation {
generation = pool.Generation
}
if numObsoleteSlices > 0 {
// Update one existing slice.
slice := obsoleteSlices[numObsoleteSlices-1]
numObsoleteSlices--
slice = slice.DeepCopy()
slice.ResourceModel = *resource
logger.V(5).Info("Reusing existing resource slice", "slice", klog.KObj(slice))
if _, err := c.kubeClient.ResourceV1alpha3().ResourceSlices().Update(ctx, slice, metav1.UpdateOptions{}); err != nil {
return fmt.Errorf("update resource slice: %w", err)
// Right now all devices get published in a single slice.
// We simply pick the first one, if there is one, and copy
// it in preparation for updating it.
//
// TODO: support splitting across slices, with unit tests.
if len(slices) > 0 {
obsoleteSlices = append(obsoleteSlices, slices[1:]...)
slices = []*resourceapi.ResourceSlice{slices[0].DeepCopy()}
} else {
slices = []*resourceapi.ResourceSlice{
{
ObjectMeta: metav1.ObjectMeta{
GenerateName: c.owner.Name + "-" + c.driver + "-",
},
},
}
}
slice := slices[0]
slice.OwnerReferences = []metav1.OwnerReference{{
APIVersion: c.owner.APIVersion,
Kind: c.owner.Kind,
Name: c.owner.Name,
UID: c.owner.UID,
Controller: ptr.To(true),
}}
slice.Spec.Driver = c.driver
slice.Spec.Pool.Name = poolName
slice.Spec.Pool.Generation = generation
slice.Spec.Pool.ResourceSliceCount = 1
slice.Spec.NodeName = nodeName
slice.Spec.NodeSelector = pool.NodeSelector
slice.Spec.AllNodes = pool.NodeSelector == nil && nodeName == ""
slice.Spec.Devices = pool.Devices
if loggerV := logger.V(6); loggerV.Enabled() {
// Dump entire resource information.
loggerV.Info("Syncing resource slices", "obsoleteSlices", klog.KObjSlice(obsoleteSlices), "slices", klog.KObjSlice(slices), "pool", pool)
} else {
logger.V(5).Info("Syncing resource slices", "obsoleteSlices", klog.KObjSlice(obsoleteSlices), "slices", klog.KObjSlice(slices), "numDevices", len(pool.Devices))
}
} else if len(slices) > 0 {
// All are obsolete, pool does not exist anymore.
logger.V(5).Info("Removing resource slices after pool removal", "obsoleteSlices", klog.KObjSlice(obsoleteSlices), "slices", klog.KObjSlice(slices), "numDevices", len(pool.Devices))
obsoleteSlices = append(obsoleteSlices, slices...)
}
// Remove stale slices.
for _, slice := range obsoleteSlices {
logger.V(5).Info("Deleting obsolete resource slice", "slice", klog.KObj(slice))
if err := c.kubeClient.ResourceV1alpha3().ResourceSlices().Delete(ctx, slice.Name, metav1.DeleteOptions{}); err != nil && !apierrors.IsNotFound(err) {
return fmt.Errorf("delete resource slice: %w", err)
}
}
// Create or update slices.
for _, slice := range slices {
if slice.UID == "" {
logger.V(5).Info("Creating new resource slice", "slice", klog.KObj(slice))
if _, err := c.kubeClient.ResourceV1alpha3().ResourceSlices().Create(ctx, slice, metav1.CreateOptions{}); err != nil {
return fmt.Errorf("create resource slice: %w", err)
}
continue
}
// Create a new slice.
slice := &resourceapi.ResourceSlice{
ObjectMeta: metav1.ObjectMeta{
GenerateName: c.owner.Name + "-" + c.driverName + "-",
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: c.owner.APIVersion,
Kind: c.owner.Kind,
Name: c.owner.Name,
UID: c.owner.UID,
Controller: ptr.To(true),
},
},
},
DriverName: c.driverName,
ResourceModel: *resource,
}
if c.owner.APIVersion == "v1" && c.owner.Kind == "Node" {
slice.NodeName = c.owner.Name
}
logger.V(5).Info("Creating new resource slice", "slice", klog.KObj(slice))
if _, err := c.kubeClient.ResourceV1alpha3().ResourceSlices().Create(ctx, slice, metav1.CreateOptions{}); err != nil {
return fmt.Errorf("create resource slice: %w", err)
}
}
// All remaining slices are truly orphaned.
for i := 0; i < numObsoleteSlices; i++ {
slice := obsoleteSlices[i]
logger.V(5).Info("Deleting obsolete resource slice", "slice", klog.KObj(slice))
if err := c.kubeClient.ResourceV1alpha3().ResourceSlices().Delete(ctx, slice.Name, metav1.DeleteOptions{}); err != nil {
// TODO: switch to SSA once unit testing supports it.
logger.V(5).Info("Updating existing resource slice", "slice", klog.KObj(slice))
if _, err := c.kubeClient.ResourceV1alpha3().ResourceSlices().Update(ctx, slice, metav1.UpdateOptions{}); err != nil {
return fmt.Errorf("delete resource slice: %w", err)
}
}
return nil
}
func indexOfModel(models []*resourceapi.ResourceModel, model *resourceapi.ResourceModel) int {
for index, m := range models {
if apiequality.Semantic.DeepEqual(m, model) {
return index
}
}
return -1
}