diff --git a/staging/src/k8s.io/dynamic-resource-allocation/controller/controller.go b/staging/src/k8s.io/dynamic-resource-allocation/controller/controller.go index f26fc083731..2a0ae0b2a79 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/controller/controller.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/controller/controller.go @@ -62,20 +62,6 @@ type Controller interface { // Driver provides the actual allocation and deallocation operations. type Driver interface { - // GetClassParameters is called to retrieve the parameter object - // referenced by a class. The content should be validated now if - // possible. class.Parameters may be nil. - // - // The caller wraps the error to include the parameter reference. - GetClassParameters(ctx context.Context, class *resourceapi.ResourceClass) (interface{}, error) - - // GetClaimParameters is called to retrieve the parameter object - // referenced by a claim. The content should be validated now if - // possible. claim.Spec.Parameters may be nil. - // - // The caller wraps the error to include the parameter reference. - GetClaimParameters(ctx context.Context, claim *resourceapi.ResourceClaim, class *resourceapi.ResourceClass, classParameters interface{}) (interface{}, error) - // Allocate is called when all same-driver ResourceClaims for Pod are ready // to be allocated. The selectedNode is empty for ResourceClaims with immediate // allocation, in which case the resource driver decides itself where @@ -136,11 +122,9 @@ type Driver interface { // ClaimAllocation represents information about one particular // pod.Spec.ResourceClaim entry. type ClaimAllocation struct { - PodClaimName string - Claim *resourceapi.ResourceClaim - Class *resourceapi.ResourceClass - ClaimParameters interface{} - ClassParameters interface{} + PodClaimName string + Claim *resourceapi.ResourceClaim + DeviceClasses map[string]*resourceapi.DeviceClass // UnsuitableNodes needs to be filled in by the driver when // Driver.UnsuitableNodes gets called. @@ -162,15 +146,12 @@ type controller struct { driver Driver setReservedFor bool kubeClient kubernetes.Interface - claimNameLookup *resourceclaim.Lookup queue workqueue.TypedRateLimitingInterface[string] eventRecorder record.EventRecorder - rcLister resourcelisters.ResourceClassLister - rcSynced cache.InformerSynced + dcLister resourcelisters.DeviceClassLister claimCache cache.MutationCache schedulingCtxLister resourcelisters.PodSchedulingContextLister - claimSynced cache.InformerSynced - schedulingCtxSynced cache.InformerSynced + synced []cache.InformerSynced } // TODO: make it configurable @@ -184,10 +165,9 @@ func New( kubeClient kubernetes.Interface, informerFactory informers.SharedInformerFactory) Controller { logger := klog.LoggerWithName(klog.FromContext(ctx), "resource controller") - rcInformer := informerFactory.Resource().V1alpha3().ResourceClasses() + dcInformer := informerFactory.Resource().V1alpha3().DeviceClasses() claimInformer := informerFactory.Resource().V1alpha3().ResourceClaims() schedulingCtxInformer := informerFactory.Resource().V1alpha3().PodSchedulingContexts() - claimNameLookup := resourceclaim.NewNameLookup(kubeClient) eventBroadcaster := record.NewBroadcaster(record.WithContext(ctx)) go func() { @@ -228,15 +208,16 @@ func New( driver: driver, setReservedFor: true, kubeClient: kubeClient, - claimNameLookup: claimNameLookup, - rcLister: rcInformer.Lister(), - rcSynced: rcInformer.Informer().HasSynced, + dcLister: dcInformer.Lister(), claimCache: claimCache, - claimSynced: claimInformer.Informer().HasSynced, schedulingCtxLister: schedulingCtxInformer.Lister(), - schedulingCtxSynced: schedulingCtxInformer.Informer().HasSynced, queue: queue, eventRecorder: eventRecorder, + synced: []cache.InformerSynced{ + dcInformer.Informer().HasSynced, + claimInformer.Informer().HasSynced, + schedulingCtxInformer.Informer().HasSynced, + }, } loggerV6 := logger.V(6) @@ -341,7 +322,7 @@ func (ctrl *controller) Run(workers int) { stopCh := ctrl.ctx.Done() - if !cache.WaitForCacheSync(stopCh, ctrl.rcSynced, ctrl.claimSynced, ctrl.schedulingCtxSynced) { + if !cache.WaitForCacheSync(stopCh, ctrl.synced...) { ctrl.logger.Error(nil, "Cannot sync caches") return } @@ -471,20 +452,19 @@ func (ctrl *controller) syncClaim(ctx context.Context, claim *resourceapi.Resour if claim.Status.Allocation != nil { // Allocation was completed. Deallocate before proceeding. if err := ctrl.driver.Deallocate(ctx, claim); err != nil { - return fmt.Errorf("deallocate: %v", err) + return fmt.Errorf("deallocate: %w", err) } claim.Status.Allocation = nil - claim.Status.DriverName = "" claim.Status.DeallocationRequested = false claim, err = ctrl.kubeClient.ResourceV1alpha3().ResourceClaims(claim.Namespace).UpdateStatus(ctx, claim, metav1.UpdateOptions{}) if err != nil { - return fmt.Errorf("remove allocation: %v", err) + return fmt.Errorf("remove allocation: %w", err) } ctrl.claimCache.Mutation(claim) } else { // Ensure that there is no on-going allocation. if err := ctrl.driver.Deallocate(ctx, claim); err != nil { - return fmt.Errorf("stop allocation: %v", err) + return fmt.Errorf("stop allocation: %w", err) } } @@ -493,7 +473,7 @@ func (ctrl *controller) syncClaim(ctx context.Context, claim *resourceapi.Resour claim.Status.DeallocationRequested = false claim, err = ctrl.kubeClient.ResourceV1alpha3().ResourceClaims(claim.Namespace).UpdateStatus(ctx, claim, metav1.UpdateOptions{}) if err != nil { - return fmt.Errorf("remove deallocation: %v", err) + return fmt.Errorf("remove deallocation: %w", err) } ctrl.claimCache.Mutation(claim) } @@ -501,7 +481,7 @@ func (ctrl *controller) syncClaim(ctx context.Context, claim *resourceapi.Resour claim.Finalizers = ctrl.removeFinalizer(claim.Finalizers) claim, err = ctrl.kubeClient.ResourceV1alpha3().ResourceClaims(claim.Namespace).Update(ctx, claim, metav1.UpdateOptions{}) if err != nil { - return fmt.Errorf("remove finalizer: %v", err) + return fmt.Errorf("remove finalizer: %w", err) } ctrl.claimCache.Mutation(claim) } @@ -519,24 +499,6 @@ func (ctrl *controller) syncClaim(ctx context.Context, claim *resourceapi.Resour return nil } -func (ctrl *controller) getParameters(ctx context.Context, claim *resourceapi.ResourceClaim, class *resourceapi.ResourceClass, notifyClaim bool) (claimParameters, classParameters interface{}, err error) { - classParameters, err = ctrl.driver.GetClassParameters(ctx, class) - if err != nil { - ctrl.eventRecorder.Event(class, v1.EventTypeWarning, "Failed", err.Error()) - err = fmt.Errorf("class parameters %s: %v", class.ParametersRef, err) - return - } - claimParameters, err = ctrl.driver.GetClaimParameters(ctx, claim, class, classParameters) - if err != nil { - if notifyClaim { - ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, "Failed", err.Error()) - } - err = fmt.Errorf("claim parameters %s: %v", claim.Spec.ParametersRef, err) - return - } - return -} - // allocateClaims filters list of claims, keeps those needing allocation and asks driver to do the allocations. // Driver is supposed to write the AllocationResult and Error field into argument claims slice. func (ctrl *controller) allocateClaims(ctx context.Context, claims []*ClaimAllocation, selectedNode string, selectedUser *resourceapi.ResourceClaimConsumerReference) { @@ -572,7 +534,7 @@ func (ctrl *controller) allocateClaims(ctx context.Context, claims []*ClaimAlloc claim, err = ctrl.kubeClient.ResourceV1alpha3().ResourceClaims(claim.Namespace).Update(ctx, claim, metav1.UpdateOptions{}) if err != nil { logger.Error(err, "add finalizer", "claim", claim.Name) - claimAllocation.Error = fmt.Errorf("add finalizer: %v", err) + claimAllocation.Error = fmt.Errorf("add finalizer: %w", err) // Do not save claim to ask for Allocate from Driver. continue } @@ -602,14 +564,14 @@ func (ctrl *controller) allocateClaims(ctx context.Context, claims []*ClaimAlloc logger.V(5).Info("successfully allocated", "claim", klog.KObj(claimAllocation.Claim)) claim := claimAllocation.Claim.DeepCopy() claim.Status.Allocation = claimAllocation.Allocation - claim.Status.DriverName = ctrl.name + claim.Status.Allocation.Controller = ctrl.name if selectedUser != nil && ctrl.setReservedFor { claim.Status.ReservedFor = append(claim.Status.ReservedFor, *selectedUser) } logger.V(6).Info("Updating claim after allocation", "claim", claim) claim, err := ctrl.kubeClient.ResourceV1alpha3().ResourceClaims(claim.Namespace).UpdateStatus(ctx, claim, metav1.UpdateOptions{}) if err != nil { - claimAllocation.Error = fmt.Errorf("add allocation: %v", err) + claimAllocation.Error = fmt.Errorf("add allocation: %w", err) continue } @@ -619,7 +581,7 @@ func (ctrl *controller) allocateClaims(ctx context.Context, claims []*ClaimAlloc } func (ctrl *controller) checkPodClaim(ctx context.Context, pod *v1.Pod, podClaim v1.PodResourceClaim) (*ClaimAllocation, error) { - claimName, mustCheckOwner, err := ctrl.claimNameLookup.Name(pod, &podClaim) + claimName, mustCheckOwner, err := resourceclaim.Name(pod, &podClaim) if err != nil { return nil, err } @@ -642,26 +604,30 @@ func (ctrl *controller) checkPodClaim(ctx context.Context, pod *v1.Pod, podClaim // need to be done for the claim either. return nil, nil } - class, err := ctrl.rcLister.Get(claim.Spec.ResourceClassName) - if err != nil { - return nil, err - } - if class.DriverName != ctrl.name { + if claim.Spec.Controller != ctrl.name { return nil, nil } - // Check parameters. Record event to claim and pod if parameters are invalid. - claimParameters, classParameters, err := ctrl.getParameters(ctx, claim, class, true) - if err != nil { - ctrl.eventRecorder.Event(pod, v1.EventTypeWarning, "Failed", fmt.Sprintf("claim %v: %v", claim.Name, err.Error())) - return nil, err + + // Sanity checks and preparations... + ca := &ClaimAllocation{ + PodClaimName: podClaim.Name, + Claim: claim, + DeviceClasses: make(map[string]*resourceapi.DeviceClass), } - return &ClaimAllocation{ - PodClaimName: podClaim.Name, - Claim: claim, - Class: class, - ClaimParameters: claimParameters, - ClassParameters: classParameters, - }, nil + for _, request := range claim.Spec.Devices.Requests { + if request.DeviceClassName == "" { + // Some unknown request. Abort! + return nil, fmt.Errorf("claim %s: unknown request type in request %s", klog.KObj(claim), request.Name) + } + deviceClassName := request.DeviceClassName + class, err := ctrl.dcLister.Get(deviceClassName) + if err != nil { + return nil, fmt.Errorf("claim %s: request %s: class %s: %w", klog.KObj(claim), request.Name, deviceClassName, err) + } + ca.DeviceClasses[deviceClassName] = class + } + + return ca, nil } // syncPodSchedulingContext determines which next action may be needed for a PodSchedulingContext object @@ -709,7 +675,7 @@ func (ctrl *controller) syncPodSchedulingContexts(ctx context.Context, schedulin for _, podClaim := range pod.Spec.ResourceClaims { delayed, err := ctrl.checkPodClaim(ctx, pod, podClaim) if err != nil { - return fmt.Errorf("pod claim %s: %v", podClaim.Name, err) + return fmt.Errorf("pod claim %s: %w", podClaim.Name, err) } if delayed == nil { // Nothing to do for it. This can change, so keep checking. @@ -739,7 +705,7 @@ func (ctrl *controller) syncPodSchedulingContexts(ctx context.Context, schedulin } if len(schedulingCtx.Spec.PotentialNodes) > 0 { if err := ctrl.driver.UnsuitableNodes(ctx, pod, claims, potentialNodes); err != nil { - return fmt.Errorf("checking potential nodes: %v", err) + return fmt.Errorf("checking potential nodes: %w", err) } } logger.V(5).Info("pending pod claims", "claims", claims, "selectedNode", selectedNode) @@ -772,7 +738,7 @@ func (ctrl *controller) syncPodSchedulingContexts(ctx context.Context, schedulin allErrors = append(allErrors, delayed.Error) } else { // Include claim name, it's not in the underlying error. - allErrors = append(allErrors, fmt.Errorf("claim %s: %v", delayed.Claim.Name, delayed.Error)) + allErrors = append(allErrors, fmt.Errorf("claim %s: %w", delayed.Claim.Name, delayed.Error)) } } } @@ -807,7 +773,7 @@ func (ctrl *controller) syncPodSchedulingContexts(ctx context.Context, schedulin if modified { logger.V(6).Info("Updating pod scheduling with modified unsuitable nodes", "podSchedulingCtx", schedulingCtx) if _, err := ctrl.kubeClient.ResourceV1alpha3().PodSchedulingContexts(schedulingCtx.Namespace).UpdateStatus(ctx, schedulingCtx, metav1.UpdateOptions{}); err != nil { - return fmt.Errorf("update unsuitable node status: %v", err) + return fmt.Errorf("update unsuitable node status: %w", err) } } diff --git a/staging/src/k8s.io/dynamic-resource-allocation/controller/controller_test.go b/staging/src/k8s.io/dynamic-resource-allocation/controller/controller_test.go index 94f6a22384d..a8d2f24748d 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/controller/controller_test.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/controller/controller_test.go @@ -44,15 +44,10 @@ func TestController(t *testing.T) { driverName := "mock-driver" className := "mock-class" otherDriverName := "other-driver" - otherClassName := "other-class" ourFinalizer := driverName + "/deletion-protection" otherFinalizer := otherDriverName + "/deletion-protection" - classes := []*resourceapi.ResourceClass{ - createClass(className, driverName), - createClass(otherClassName, otherDriverName), - } - claim := createClaim(claimName, claimNamespace, className) - otherClaim := createClaim(claimName, claimNamespace, otherClassName) + claim := createClaim(claimName, claimNamespace, driverName) + otherClaim := createClaim(claimName, claimNamespace, otherDriverName) podName := "pod" podKey := "schedulingCtx:default/pod" pod := createPod(podName, claimNamespace, nil) @@ -87,12 +82,11 @@ func TestController(t *testing.T) { claim.Finalizers = append(claim.Finalizers, finalizer) return claim } - allocation := resourceapi.AllocationResult{} + allocation := resourceapi.AllocationResult{Controller: driverName} withAllocate := func(claim *resourceapi.ResourceClaim) *resourceapi.ResourceClaim { // Any allocated claim must have our finalizer. claim = withFinalizer(claim, ourFinalizer) claim.Status.Allocation = &allocation - claim.Status.DriverName = driverName return claim } withDeallocate := func(claim *resourceapi.ResourceClaim) *resourceapi.ResourceClaim { @@ -128,7 +122,6 @@ func TestController(t *testing.T) { for name, test := range map[string]struct { key string driver mockDriver - classes []*resourceapi.ResourceClass pod *corev1.Pod schedulingCtx, expectedSchedulingCtx *resourceapi.PodSchedulingContext claim, expectedClaim *resourceapi.ResourceClaim @@ -143,7 +136,6 @@ func TestController(t *testing.T) { }, "wrong-driver": { key: claimKey, - classes: classes, claim: otherClaim, expectedClaim: otherClaim, }, @@ -151,7 +143,6 @@ func TestController(t *testing.T) { // not deleted, reallocate -> deallocate "immediate-allocated-reallocate": { key: claimKey, - classes: classes, claim: withDeallocate(withAllocate(claim)), driver: m.expectDeallocate(map[string]error{claimName: nil}), expectedClaim: claim, @@ -160,7 +151,6 @@ func TestController(t *testing.T) { // not deleted, reallocate, deallocate failure -> requeue "immediate-allocated-fail-deallocation-during-reallocate": { key: claimKey, - classes: classes, claim: withDeallocate(withAllocate(claim)), driver: m.expectDeallocate(map[string]error{claimName: errors.New("fake error")}), expectedClaim: withDeallocate(withAllocate(claim)), @@ -170,7 +160,6 @@ func TestController(t *testing.T) { // deletion time stamp set, our finalizer set, not allocated -> remove finalizer "deleted-finalizer-removal": { key: claimKey, - classes: classes, claim: withFinalizer(withDeletionTimestamp(claim), ourFinalizer), driver: m.expectDeallocate(map[string]error{claimName: nil}), expectedClaim: withDeletionTimestamp(claim), @@ -178,7 +167,6 @@ func TestController(t *testing.T) { // deletion time stamp set, our finalizer set, not allocated, stopping fails -> requeue "deleted-finalizer-stop-failure": { key: claimKey, - classes: classes, claim: withFinalizer(withDeletionTimestamp(claim), ourFinalizer), driver: m.expectDeallocate(map[string]error{claimName: errors.New("fake error")}), expectedClaim: withFinalizer(withDeletionTimestamp(claim), ourFinalizer), @@ -187,14 +175,12 @@ func TestController(t *testing.T) { // deletion time stamp set, other finalizer set, not allocated -> do nothing "deleted-finalizer-no-removal": { key: claimKey, - classes: classes, claim: withFinalizer(withDeletionTimestamp(claim), otherFinalizer), expectedClaim: withFinalizer(withDeletionTimestamp(claim), otherFinalizer), }, // deletion time stamp set, finalizer set, allocated -> deallocate "deleted-allocated": { key: claimKey, - classes: classes, claim: withAllocate(withDeletionTimestamp(claim)), driver: m.expectDeallocate(map[string]error{claimName: nil}), expectedClaim: withDeletionTimestamp(claim), @@ -202,7 +188,6 @@ func TestController(t *testing.T) { // deletion time stamp set, finalizer set, allocated, deallocation fails -> requeue "deleted-deallocate-failure": { key: claimKey, - classes: classes, claim: withAllocate(withDeletionTimestamp(claim)), driver: m.expectDeallocate(map[string]error{claimName: errors.New("fake error")}), expectedClaim: withAllocate(withDeletionTimestamp(claim)), @@ -211,14 +196,12 @@ func TestController(t *testing.T) { // deletion time stamp set, finalizer not set -> do nothing "deleted-no-finalizer": { key: claimKey, - classes: classes, claim: withDeletionTimestamp(claim), expectedClaim: withDeletionTimestamp(claim), }, // waiting for first consumer -> do nothing "pending": { key: claimKey, - classes: classes, claim: claim, expectedClaim: claim, }, @@ -235,7 +218,6 @@ func TestController(t *testing.T) { // no potential nodes -> shouldn't occur "no-nodes": { key: podKey, - classes: classes, claim: claim, expectedClaim: claim, pod: podWithClaim, @@ -246,7 +228,6 @@ func TestController(t *testing.T) { // potential nodes -> provide unsuitable nodes "info": { key: podKey, - classes: classes, claim: claim, expectedClaim: claim, pod: podWithClaim, @@ -258,21 +239,9 @@ func TestController(t *testing.T) { expectedError: errPeriodic.Error(), }, - // potential nodes, selected node, missing class -> failure - "missing-class": { - key: podKey, - claim: claim, - expectedClaim: claim, - pod: podWithClaim, - schedulingCtx: withSelectedNode(withPotentialNodes(podSchedulingCtx)), - expectedSchedulingCtx: withSelectedNode(withPotentialNodes(podSchedulingCtx)), - expectedError: `pod claim my-pod-claim: resourceclass.resource.k8s.io "mock-class" not found`, - }, - // potential nodes, selected node -> allocate "allocate": { key: podKey, - classes: classes, claim: claim, expectedClaim: withReservedFor(withAllocate(claim), pod), pod: podWithClaim, @@ -287,7 +256,6 @@ func TestController(t *testing.T) { // potential nodes, selected node, all unsuitable -> update unsuitable nodes "is-potential-node": { key: podKey, - classes: classes, claim: claim, expectedClaim: claim, pod: podWithClaim, @@ -301,7 +269,6 @@ func TestController(t *testing.T) { // max potential nodes, other selected node, all unsuitable -> update unsuitable nodes with truncation at start "is-potential-node-truncate-first": { key: podKey, - classes: classes, claim: claim, expectedClaim: claim, pod: podWithClaim, @@ -315,7 +282,6 @@ func TestController(t *testing.T) { // max potential nodes, other selected node, all unsuitable (but in reverse order) -> update unsuitable nodes with truncation at end "pod-selected-is-potential-node-truncate-last": { key: podKey, - classes: classes, claim: claim, expectedClaim: claim, pod: podWithClaim, @@ -332,9 +298,6 @@ func TestController(t *testing.T) { ctx, cancel := context.WithCancel(ctx) initialObjects := []runtime.Object{} - for _, class := range test.classes { - initialObjects = append(initialObjects, class) - } if test.pod != nil { initialObjects = append(initialObjects, test.pod) } @@ -345,7 +308,6 @@ func TestController(t *testing.T) { initialObjects = append(initialObjects, test.claim) } kubeClient, informerFactory := fakeK8s(initialObjects) - rcInformer := informerFactory.Resource().V1alpha3().ResourceClasses() claimInformer := informerFactory.Resource().V1alpha3().ResourceClaims() podInformer := informerFactory.Core().V1().Pods() podSchedulingInformer := informerFactory.Resource().V1alpha3().PodSchedulingContexts() @@ -356,8 +318,6 @@ func TestController(t *testing.T) { for _, obj := range initialObjects { switch obj.(type) { - case *resourceapi.ResourceClass: - require.NoError(t, rcInformer.Informer().GetStore().Add(obj), "add resource class") case *resourceapi.ResourceClaim: require.NoError(t, claimInformer.Informer().GetStore().Add(obj), "add resource claim") case *corev1.Pod: @@ -375,7 +335,6 @@ func TestController(t *testing.T) { ctrl := New(ctx, driverName, driver, kubeClient, informerFactory) informerFactory.Start(ctx.Done()) if !cache.WaitForCacheSync(ctx.Done(), - informerFactory.Resource().V1alpha3().ResourceClasses().Informer().HasSynced, informerFactory.Resource().V1alpha3().ResourceClaims().Informer().HasSynced, informerFactory.Resource().V1alpha3().PodSchedulingContexts().Informer().HasSynced, ) { @@ -459,30 +418,6 @@ func (m mockDriver) expectUnsuitableNodes(expected map[string][]string, err erro return m } -func (m mockDriver) GetClassParameters(ctx context.Context, class *resourceapi.ResourceClass) (interface{}, error) { - m.t.Logf("GetClassParameters(%s)", class) - result, ok := m.classParameters[class.Name] - if !ok { - m.t.Fatal("unexpected GetClassParameters call") - } - if err, ok := result.(error); ok { - return nil, err - } - return result, nil -} - -func (m mockDriver) GetClaimParameters(ctx context.Context, claim *resourceapi.ResourceClaim, class *resourceapi.ResourceClass, classParameters interface{}) (interface{}, error) { - m.t.Logf("GetClaimParameters(%s)", claim) - result, ok := m.claimParameters[claim.Name] - if !ok { - m.t.Fatal("unexpected GetClaimParameters call") - } - if err, ok := result.(error); ok { - return nil, err - } - return result, nil -} - func (m mockDriver) Allocate(ctx context.Context, claims []*ClaimAllocation, selectedNode string) { m.t.Logf("Allocate(number of claims %d)", len(claims)) for _, claimAllocation := range claims { @@ -532,23 +467,14 @@ func (m mockDriver) UnsuitableNodes(ctx context.Context, pod *corev1.Pod, claims return nil } -func createClass(className, driverName string) *resourceapi.ResourceClass { - return &resourceapi.ResourceClass{ - ObjectMeta: metav1.ObjectMeta{ - Name: className, - }, - DriverName: driverName, - } -} - -func createClaim(claimName, claimNamespace, className string) *resourceapi.ResourceClaim { +func createClaim(claimName, claimNamespace, driverName string) *resourceapi.ResourceClaim { return &resourceapi.ResourceClaim{ ObjectMeta: metav1.ObjectMeta{ Name: claimName, Namespace: claimNamespace, }, Spec: resourceapi.ResourceClaimSpec{ - ResourceClassName: className, + Controller: driverName, }, } } diff --git a/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/draplugin.go b/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/draplugin.go index a7b04fac2bc..94b9ae959af 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/draplugin.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/draplugin.go @@ -54,8 +54,8 @@ type DRAPlugin interface { // after it returns before all information is actually written // to the API server. // - // The caller must not modify the content of the slice. - PublishResources(ctx context.Context, nodeResources []*resourceapi.ResourceModel) + // The caller must not modify the content after the call. + PublishResources(ctx context.Context, resources Resources) // This unexported method ensures that we can modify the interface // without causing an API break of the package @@ -63,6 +63,12 @@ type DRAPlugin interface { internal() } +// Resources currently only supports devices. Might get extended in the +// future. +type Resources struct { + Devices []resourceapi.Device +} + // Option implements the functional options pattern for Start. type Option func(o *options) error @@ -360,7 +366,7 @@ func (d *draPlugin) Stop() { } // PublishResources implements [DRAPlugin.PublishResources]. -func (d *draPlugin) PublishResources(ctx context.Context, nodeResources []*resourceapi.ResourceModel) { +func (d *draPlugin) PublishResources(ctx context.Context, resources Resources) { d.mutex.Lock() defer d.mutex.Unlock() @@ -370,7 +376,13 @@ func (d *draPlugin) PublishResources(ctx context.Context, nodeResources []*resou Name: d.nodeName, UID: d.nodeUID, // Optional, will be determined by controller if empty. } - resources := &resourceslice.Resources{NodeResources: nodeResources} + driverResources := &resourceslice.DriverResources{ + Pools: map[string]resourceslice.Pool{ + d.nodeName: { + Devices: resources.Devices, + }, + }, + } if d.resourceSliceController == nil { // Start publishing the information. The controller is using // our background context, not the one passed into this @@ -380,12 +392,12 @@ func (d *draPlugin) PublishResources(ctx context.Context, nodeResources []*resou controllerLogger := klog.FromContext(controllerCtx) controllerLogger = klog.LoggerWithName(controllerLogger, "ResourceSlice controller") controllerCtx = klog.NewContext(controllerCtx, controllerLogger) - d.resourceSliceController = resourceslice.StartController(controllerCtx, d.kubeClient, d.driverName, owner, resources) + d.resourceSliceController = resourceslice.StartController(controllerCtx, d.kubeClient, d.driverName, owner, driverResources) return } // Inform running controller about new information. - d.resourceSliceController.Update(resources) + d.resourceSliceController.Update(driverResources) } // RegistrationStatus implements [DRAPlugin.RegistrationStatus]. diff --git a/staging/src/k8s.io/dynamic-resource-allocation/resourceclaim/resourceclaim.go b/staging/src/k8s.io/dynamic-resource-allocation/resourceclaim/resourceclaim.go index 11f820fcded..3d59e85709f 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/resourceclaim/resourceclaim.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/resourceclaim/resourceclaim.go @@ -26,15 +26,10 @@ package resourceclaim import ( "errors" "fmt" - "os" - "strings" - "sync/atomic" v1 "k8s.io/api/core/v1" resourceapi "k8s.io/api/resource/v1alpha3" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/kubernetes" - "k8s.io/utils/ptr" ) var ( @@ -81,78 +76,6 @@ func Name(pod *v1.Pod, podClaim *v1.PodResourceClaim) (name *string, mustCheckOw } } -// NewNameLookup returns an object which handles determining the name of -// a ResourceClaim. In contrast to the stand-alone Name it is compatible -// also with Kubernetes < 1.28. -// -// Providing a client is optional. If none is available, then code can pass nil -// and users can set the DRA_WITH_DETERMINISTIC_RESOURCE_CLAIM_NAMES env -// variable to an arbitrary non-empty value to use the naming from Kubernetes < -// 1.28. -func NewNameLookup(client kubernetes.Interface) *Lookup { - return &Lookup{client: client} -} - -// Lookup stores the state which is necessary to look up ResourceClaim names. -type Lookup struct { - client kubernetes.Interface - usePodStatus atomic.Pointer[bool] -} - -// Name is a variant of the stand-alone Name with support also for Kubernetes < 1.28. -func (l *Lookup) Name(pod *v1.Pod, podClaim *v1.PodResourceClaim) (name *string, mustCheckOwner bool, err error) { - usePodStatus := l.usePodStatus.Load() - if usePodStatus == nil { - if value, _ := os.LookupEnv("DRA_WITH_DETERMINISTIC_RESOURCE_CLAIM_NAMES"); value != "" { - usePodStatus = ptr.To(false) - } else if l.client != nil { - // Check once. This does not detect upgrades or - // downgrades, but that is good enough for the simple - // test scenarios that the Kubernetes < 1.28 support is - // meant for. - info, err := l.client.Discovery().ServerVersion() - if err != nil { - return nil, false, fmt.Errorf("look up server version: %v", err) - } - if info.Major == "" { - // Fake client... - usePodStatus = ptr.To(true) - } else { - switch strings.Compare(info.Major, "1") { - case -1: - // Huh? - usePodStatus = ptr.To(false) - case 0: - // info.Minor may have a suffix which makes it larger than 28. - // We don't care about pre-releases here. - usePodStatus = ptr.To(strings.Compare("28", info.Minor) <= 0) - case 1: - // Kubernetes 2? Yeah! - usePodStatus = ptr.To(true) - } - } - } else { - // No information. Let's assume recent Kubernetes. - usePodStatus = ptr.To(true) - } - l.usePodStatus.Store(usePodStatus) - } - - if *usePodStatus { - return Name(pod, podClaim) - } - - switch { - case podClaim.ResourceClaimName != nil: - return podClaim.ResourceClaimName, false, nil - case podClaim.ResourceClaimTemplateName != nil: - name := pod.Name + "-" + podClaim.Name - return &name, true, nil - default: - return nil, false, fmt.Errorf(`pod "%s/%s", spec.resourceClaim %q: %w`, pod.Namespace, pod.Name, podClaim.Name, ErrAPIUnsupported) - } -} - // IsForPod checks that the ResourceClaim is the one that // was created for the Pod. It returns an error that is informative // enough to be returned by the caller without adding further details @@ -183,9 +106,3 @@ func CanBeReserved(claim *resourceapi.ResourceClaim) bool { // Currently no restrictions on sharing... return true } - -// IsAllocatedWithStructuredParameters checks whether the claim is allocated -// and the allocation was done with structured parameters. -func IsAllocatedWithStructuredParameters(claim *resourceapi.ResourceClaim) bool { - return claim.Status.Allocation != nil && claim.Status.Allocation.Controller == "" -} diff --git a/staging/src/k8s.io/dynamic-resource-allocation/resourceslice/noderesources.go b/staging/src/k8s.io/dynamic-resource-allocation/resourceslice/resourceslicecontroller.go similarity index 53% rename from staging/src/k8s.io/dynamic-resource-allocation/resourceslice/noderesources.go rename to staging/src/k8s.io/dynamic-resource-allocation/resourceslice/resourceslicecontroller.go index b70212523b3..01fd88be1ba 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/resourceslice/noderesources.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/resourceslice/resourceslicecontroller.go @@ -25,13 +25,13 @@ import ( "github.com/google/go-cmp/cmp" + "k8s.io/api/core/v1" resourceapi "k8s.io/api/resource/v1alpha3" - apiequality "k8s.io/apimachinery/pkg/api/equality" + apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/types" utilruntime "k8s.io/apimachinery/pkg/util/runtime" - "k8s.io/apimachinery/pkg/util/sets" resourceinformers "k8s.io/client-go/informers/resource/v1alpha3" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" @@ -46,18 +46,17 @@ const ( resyncPeriod = time.Duration(10 * time.Minute) ) -// Controller synchronizes information about resources of one -// driver with ResourceSlice objects. It currently supports node-local +// Controller synchronizes information about resources of one driver with +// ResourceSlice objects. It supports node-local and network-attached // resources. A DRA driver for node-local resources typically runs this // controller as part of its kubelet plugin. -// -// Support for network-attached resources will be added later. type Controller struct { cancel func(cause error) - driverName string + driver string owner Owner kubeClient kubernetes.Interface wg sync.WaitGroup + // The queue is keyed with the pool name that needs work. queue workqueue.TypedRateLimitingInterface[string] sliceStore cache.Store @@ -66,15 +65,32 @@ type Controller struct { // When receiving updates from the driver, the entire pointer replaced, // so it is okay to not do a deep copy of it when reading it. Only reading // the pointer itself must be protected by a read lock. - resources *Resources + resources *DriverResources } -// Resources is a complete description of all resources synchronized by the controller. -type Resources struct { - // NodeResources are resources that are local to one node. - NodeResources []*resourceapi.ResourceModel +// DriverResources is a complete description of all resources synchronized by the controller. +type DriverResources struct { + // Each driver may manage different resource pools. + Pools map[string]Pool } +// Pool is the collection of devices belonging to the same pool. +type Pool struct { + // NodeSelector may be different for each pool. Must not get set together + // with Resources.NodeName. It nil and Resources.NodeName is not set, + // then devices are available on all nodes. + NodeSelector *v1.NodeSelector + + // Generation can be left at zero. It gets bumped up automatically + // by the controller. + Generation int64 + + // Device names must be unique inside the pool. + Devices []resourceapi.Device +} + +// Owner is the resource which is meant to be listed as owner of the resource slices. +// For a node the UID may be left blank. The controller will look it up automatically. type Owner struct { APIVersion string Kind string @@ -93,7 +109,7 @@ type Owner struct { // the controller is inactive. This can happen when kubelet is run stand-alone // without an apiserver. In that case we can't and don't need to publish // ResourceSlices. -func StartController(ctx context.Context, kubeClient kubernetes.Interface, driverName string, owner Owner, resources *Resources) *Controller { +func StartController(ctx context.Context, kubeClient kubernetes.Interface, driver string, owner Owner, resources *DriverResources) *Controller { if kubeClient == nil { return nil } @@ -104,7 +120,7 @@ func StartController(ctx context.Context, kubeClient kubernetes.Interface, drive c := &Controller{ cancel: cancel, kubeClient: kubeClient, - driverName: driverName, + driver: driver, owner: owner, queue: workqueue.NewTypedRateLimitingQueueWithConfig( workqueue.DefaultTypedControllerRateLimiter[string](), @@ -121,8 +137,10 @@ func StartController(ctx context.Context, kubeClient kubernetes.Interface, drive c.run(ctx) }() - // Sync once. - c.queue.Add("") + // Sync each pool once. + for poolName := range resources.Pools { + c.queue.Add(poolName) + } return c } @@ -137,12 +155,24 @@ func (c *Controller) Stop() { } // Update sets the new desired state of the resource information. -func (c *Controller) Update(resources *Resources) { +// +// The controller takes over ownership, so these resources must +// not get modified after this method returns. +func (c *Controller) Update(resources *DriverResources) { c.mutex.Lock() defer c.mutex.Unlock() + // Sync all old pools.. + for poolName := range c.resources.Pools { + c.queue.Add(poolName) + } + c.resources = resources - c.queue.Add("") + + // ... and the new ones (might be the same). + for poolName := range c.resources.Pools { + c.queue.Add(poolName) + } } // run is running in the background. It handles blocking initialization (like @@ -151,9 +181,9 @@ func (c *Controller) run(ctx context.Context) { logger := klog.FromContext(ctx) // We always filter by driver name, by node name only for node-local resources. - selector := fields.Set{"driverName": c.driverName} + selector := fields.Set{resourceapi.ResourceSliceSelectorDriver: c.driver} if c.owner.APIVersion == "v1" && c.owner.Kind == "Node" { - selector["nodeName"] = c.owner.Name + selector[resourceapi.ResourceSliceSelectorNodeName] = c.owner.Name } informer := resourceinformers.NewFilteredResourceSliceInformer(c.kubeClient, resyncPeriod, nil, func(options *metav1.ListOptions) { options.FieldSelector = selector.String() @@ -166,7 +196,7 @@ func (c *Controller) run(ctx context.Context) { return } logger.V(5).Info("ResourceSlice add", "slice", klog.KObj(slice)) - c.queue.Add("") + c.queue.Add(slice.Spec.Pool.Name) }, UpdateFunc: func(old, new any) { oldSlice, ok := old.(*resourceapi.ResourceSlice) @@ -182,7 +212,8 @@ func (c *Controller) run(ctx context.Context) { } else { logger.V(5).Info("ResourceSlice update", "slice", klog.KObj(newSlice)) } - c.queue.Add("") + c.queue.Add(oldSlice.Spec.Pool.Name) + c.queue.Add(newSlice.Spec.Pool.Name) }, DeleteFunc: func(obj any) { if tombstone, ok := obj.(cache.DeletedFinalStateUnknown); ok { @@ -193,7 +224,7 @@ func (c *Controller) run(ctx context.Context) { return } logger.V(5).Info("ResourceSlice delete", "slice", klog.KObj(slice)) - c.queue.Add("") + c.queue.Add(slice.Spec.Pool.Name) }, }) if err != nil { @@ -219,16 +250,19 @@ func (c *Controller) run(ctx context.Context) { } logger.V(3).Info("ResourceSlice informer has synced") + // Seed the + for c.processNextWorkItem(ctx) { } } func (c *Controller) processNextWorkItem(ctx context.Context) bool { - key, shutdown := c.queue.Get() + poolName, shutdown := c.queue.Get() if shutdown { return false } - defer c.queue.Done(key) + defer c.queue.Done(poolName) + logger := klog.FromContext(ctx) // Panics are caught and treated like errors. var err error @@ -238,154 +272,157 @@ func (c *Controller) processNextWorkItem(ctx context.Context) bool { err = fmt.Errorf("internal error: %v", r) } }() - err = c.sync(ctx) + err = c.syncPool(klog.NewContext(ctx, klog.LoggerWithValues(logger, "poolName", poolName)), poolName) }() if err != nil { utilruntime.HandleErrorWithContext(ctx, err, "processing ResourceSlice objects") - c.queue.AddRateLimited(key) + c.queue.AddRateLimited(poolName) // Return without removing the work item from the queue. // It will be retried. return true } - c.queue.Forget(key) + c.queue.Forget(poolName) return true } -func (c *Controller) sync(ctx context.Context) error { +// syncPool processes one pool. Only runs inside a single worker, so there +// is no need for locking except when accessing c.resources, which may +// be updated at any time by the user of the controller. +func (c *Controller) syncPool(ctx context.Context, poolName string) error { logger := klog.FromContext(ctx) // Gather information about the actual and desired state. - slices := c.sliceStore.List() - var resources *Resources + // TODO: index by pool name. + var slices []*resourceapi.ResourceSlice + for _, obj := range c.sliceStore.List() { + if slice, ok := obj.(*resourceapi.ResourceSlice); ok && slice.Spec.Pool.Name == poolName { + slices = append(slices, slice) + } + } + var resources *DriverResources c.mutex.RLock() resources = c.resources c.mutex.RUnlock() - // Resources that are not yet stored in any slice need to be published. - // Here we track the indices of any resources that are already stored. - storedResourceIndices := sets.New[int]() + // Retrieve node object to get UID? + // The result gets cached and is expected to not change while + // the controller runs. + var nodeName string + if c.owner.APIVersion == "v1" && c.owner.Kind == "Node" { + nodeName = c.owner.Name + if c.owner.UID == "" { + node, err := c.kubeClient.CoreV1().Nodes().Get(ctx, c.owner.Name, metav1.GetOptions{}) + if err != nil { + return fmt.Errorf("retrieve node %q: %w", c.owner.Name, err) + } + // There is only one worker, so no locking needed. + c.owner.UID = node.UID + } + } // Slices that don't match any driver resource can either be updated (if there // are new driver resources that need to be stored) or they need to be deleted. obsoleteSlices := make([]*resourceapi.ResourceSlice, 0, len(slices)) - // Match slices with resource information. - for _, obj := range slices { - slice := obj.(*resourceapi.ResourceSlice) - - // TODO: network-attached resources. - index := indexOfModel(resources.NodeResources, &slice.ResourceModel) - if index >= 0 { - storedResourceIndices.Insert(index) - continue + // Determine highest generation. + var generation int64 + for _, slice := range slices { + if slice.Spec.Pool.Generation > generation { + generation = slice.Spec.Pool.Generation } - - obsoleteSlices = append(obsoleteSlices, slice) } - if loggerV := logger.V(6); loggerV.Enabled() { - // Dump entire resource information. - loggerV.Info("Syncing existing driver resource slices with driver resources", "slices", klog.KObjSlice(slices), "resources", resources) - } else { - logger.V(5).Info("Syncing existing driver resource slices with driver resources", "slices", klog.KObjSlice(slices), "numResources", len(resources.NodeResources)) - } - - // Retrieve node object to get UID? - // The result gets cached and is expected to not change while - // the controller runs. - if c.owner.UID == "" && c.owner.APIVersion == "v1" && c.owner.Kind == "Node" { - node, err := c.kubeClient.CoreV1().Nodes().Get(ctx, c.owner.Name, metav1.GetOptions{}) - if err != nil { - return fmt.Errorf("retrieve node %q: %w", c.owner.Name, err) + // Everything older is obsolete. + currentSlices := make([]*resourceapi.ResourceSlice, 0, len(slices)) + for _, slice := range slices { + if slice.Spec.Pool.Generation < generation { + obsoleteSlices = append(obsoleteSlices, slice) + } else { + currentSlices = append(currentSlices, slice) } - // There is only one worker, so no locking needed. - c.owner.UID = node.UID } + slices = currentSlices - // Update stale slices before removing what's left. - // - // We don't really know which of these slices might have - // been used for "the" driver resource because they don't - // have a unique ID. In practice, a driver is most likely - // to just give us one ResourceModel, in which case - // this isn't a problem at all. If we have more than one, - // then at least conceptually it currently doesn't matter - // where we publish it. - // - // The long-term goal is to move the handling of - // ResourceSlice objects into the driver, with kubelet - // just acting as a REST proxy. The advantage of that will - // be that kubelet won't need to support the same - // resource API version as the driver and the control plane. - // With that approach, the driver will be able to match - // up objects more intelligently. - numObsoleteSlices := len(obsoleteSlices) - for index, resource := range resources.NodeResources { - if storedResourceIndices.Has(index) { - // No need to do anything, it is already stored exactly - // like this in an existing slice. - continue + if pool, ok := resources.Pools[poolName]; ok { + if pool.Generation > generation { + generation = pool.Generation } - if numObsoleteSlices > 0 { - // Update one existing slice. - slice := obsoleteSlices[numObsoleteSlices-1] - numObsoleteSlices-- - slice = slice.DeepCopy() - slice.ResourceModel = *resource - logger.V(5).Info("Reusing existing resource slice", "slice", klog.KObj(slice)) - if _, err := c.kubeClient.ResourceV1alpha3().ResourceSlices().Update(ctx, slice, metav1.UpdateOptions{}); err != nil { - return fmt.Errorf("update resource slice: %w", err) + // Right now all devices get published in a single slice. + // We simply pick the first one, if there is one, and copy + // it in preparation for updating it. + // + // TODO: support splitting across slices, with unit tests. + if len(slices) > 0 { + obsoleteSlices = append(obsoleteSlices, slices[1:]...) + slices = []*resourceapi.ResourceSlice{slices[0].DeepCopy()} + } else { + slices = []*resourceapi.ResourceSlice{ + { + ObjectMeta: metav1.ObjectMeta{ + GenerateName: c.owner.Name + "-" + c.driver + "-", + }, + }, + } + } + + slice := slices[0] + slice.OwnerReferences = []metav1.OwnerReference{{ + APIVersion: c.owner.APIVersion, + Kind: c.owner.Kind, + Name: c.owner.Name, + UID: c.owner.UID, + Controller: ptr.To(true), + }} + slice.Spec.Driver = c.driver + slice.Spec.Pool.Name = poolName + slice.Spec.Pool.Generation = generation + slice.Spec.Pool.ResourceSliceCount = 1 + slice.Spec.NodeName = nodeName + slice.Spec.NodeSelector = pool.NodeSelector + slice.Spec.AllNodes = pool.NodeSelector == nil && nodeName == "" + slice.Spec.Devices = pool.Devices + + if loggerV := logger.V(6); loggerV.Enabled() { + // Dump entire resource information. + loggerV.Info("Syncing resource slices", "obsoleteSlices", klog.KObjSlice(obsoleteSlices), "slices", klog.KObjSlice(slices), "pool", pool) + } else { + logger.V(5).Info("Syncing resource slices", "obsoleteSlices", klog.KObjSlice(obsoleteSlices), "slices", klog.KObjSlice(slices), "numDevices", len(pool.Devices)) + } + } else if len(slices) > 0 { + // All are obsolete, pool does not exist anymore. + + logger.V(5).Info("Removing resource slices after pool removal", "obsoleteSlices", klog.KObjSlice(obsoleteSlices), "slices", klog.KObjSlice(slices), "numDevices", len(pool.Devices)) + obsoleteSlices = append(obsoleteSlices, slices...) + } + + // Remove stale slices. + for _, slice := range obsoleteSlices { + logger.V(5).Info("Deleting obsolete resource slice", "slice", klog.KObj(slice)) + if err := c.kubeClient.ResourceV1alpha3().ResourceSlices().Delete(ctx, slice.Name, metav1.DeleteOptions{}); err != nil && !apierrors.IsNotFound(err) { + return fmt.Errorf("delete resource slice: %w", err) + } + } + + // Create or update slices. + for _, slice := range slices { + if slice.UID == "" { + logger.V(5).Info("Creating new resource slice", "slice", klog.KObj(slice)) + if _, err := c.kubeClient.ResourceV1alpha3().ResourceSlices().Create(ctx, slice, metav1.CreateOptions{}); err != nil { + return fmt.Errorf("create resource slice: %w", err) } continue } - // Create a new slice. - slice := &resourceapi.ResourceSlice{ - ObjectMeta: metav1.ObjectMeta{ - GenerateName: c.owner.Name + "-" + c.driverName + "-", - OwnerReferences: []metav1.OwnerReference{ - { - APIVersion: c.owner.APIVersion, - Kind: c.owner.Kind, - Name: c.owner.Name, - UID: c.owner.UID, - Controller: ptr.To(true), - }, - }, - }, - DriverName: c.driverName, - ResourceModel: *resource, - } - if c.owner.APIVersion == "v1" && c.owner.Kind == "Node" { - slice.NodeName = c.owner.Name - } - logger.V(5).Info("Creating new resource slice", "slice", klog.KObj(slice)) - if _, err := c.kubeClient.ResourceV1alpha3().ResourceSlices().Create(ctx, slice, metav1.CreateOptions{}); err != nil { - return fmt.Errorf("create resource slice: %w", err) - } - } - - // All remaining slices are truly orphaned. - for i := 0; i < numObsoleteSlices; i++ { - slice := obsoleteSlices[i] - logger.V(5).Info("Deleting obsolete resource slice", "slice", klog.KObj(slice)) - if err := c.kubeClient.ResourceV1alpha3().ResourceSlices().Delete(ctx, slice.Name, metav1.DeleteOptions{}); err != nil { + // TODO: switch to SSA once unit testing supports it. + logger.V(5).Info("Updating existing resource slice", "slice", klog.KObj(slice)) + if _, err := c.kubeClient.ResourceV1alpha3().ResourceSlices().Update(ctx, slice, metav1.UpdateOptions{}); err != nil { return fmt.Errorf("delete resource slice: %w", err) } } return nil } - -func indexOfModel(models []*resourceapi.ResourceModel, model *resourceapi.ResourceModel) int { - for index, m := range models { - if apiequality.Semantic.DeepEqual(m, model) { - return index - } - } - return -1 -}