diff --git a/pkg/kubelet/cm/dra/claiminfo.go b/pkg/kubelet/cm/dra/claiminfo.go index 7266f9e72b2..d369b8d3e33 100644 --- a/pkg/kubelet/cm/dra/claiminfo.go +++ b/pkg/kubelet/cm/dra/claiminfo.go @@ -33,9 +33,10 @@ import ( type ClaimInfo struct { sync.RWMutex state.ClaimInfoState - // annotations is a list of container annotations associated with + // annotations is a mapping of container annotations per DRA plugin associated with // a prepared resource - annotations []kubecontainer.Annotation + annotations map[string][]kubecontainer.Annotation + prepared bool } func (info *ClaimInfo) addPodReference(podUID types.UID) { @@ -69,11 +70,23 @@ func (info *ClaimInfo) addCDIDevices(pluginName string, cdiDevices []string) err } info.CDIDevices[pluginName] = cdiDevices - info.annotations = append(info.annotations, annotations...) + info.annotations[pluginName] = annotations return nil } +// annotationsAsList returns container annotations as a single list. +func (info *ClaimInfo) annotationsAsList() []kubecontainer.Annotation { + info.RLock() + defer info.RUnlock() + + var lst []kubecontainer.Annotation + for _, v := range info.annotations { + lst = append(lst, v...) + } + return lst +} + // claimInfoCache is a cache of processed resource claims keyed by namespace + claim name. type claimInfoCache struct { sync.RWMutex @@ -93,10 +106,33 @@ func newClaimInfo(driverName, className string, claimUID types.UID, claimName, n } claimInfo := ClaimInfo{ ClaimInfoState: claimInfoState, + annotations: make(map[string][]kubecontainer.Annotation), } return &claimInfo } +// newClaimInfoFromResourceClaim creates a new ClaimInfo object +func newClaimInfoFromResourceClaim(resourceClaim *resourcev1alpha2.ResourceClaim) *ClaimInfo { + // Grab the allocation.resourceHandles. If there are no + // allocation.resourceHandles, create a single resourceHandle with no + // content. This will trigger processing of this claim by a single + // kubelet plugin whose name matches resourceClaim.Status.DriverName. + resourceHandles := resourceClaim.Status.Allocation.ResourceHandles + if len(resourceHandles) == 0 { + resourceHandles = make([]resourcev1alpha2.ResourceHandle, 1) + } + + return newClaimInfo( + resourceClaim.Status.DriverName, + resourceClaim.Spec.ResourceClassName, + resourceClaim.UID, + resourceClaim.Name, + resourceClaim.Namespace, + make(sets.Set[string]), + resourceHandles, + ) +} + // newClaimInfoCache is a function that returns an instance of the claimInfoCache. func newClaimInfoCache(stateDir, checkpointName string) (*claimInfoCache, error) { stateImpl, err := state.NewCheckpointState(stateDir, checkpointName) diff --git a/pkg/kubelet/cm/dra/manager.go b/pkg/kubelet/cm/dra/manager.go index 703eae58b4f..62a2bd4cd4f 100644 --- a/pkg/kubelet/cm/dra/manager.go +++ b/pkg/kubelet/cm/dra/manager.go @@ -21,10 +21,8 @@ import ( "fmt" v1 "k8s.io/api/core/v1" - resourcev1alpha2 "k8s.io/api/resource/v1alpha2" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/sets" clientset "k8s.io/client-go/kubernetes" "k8s.io/dynamic-resource-allocation/resourceclaim" "k8s.io/klog/v2" @@ -109,42 +107,30 @@ func (m *ManagerImpl) PrepareResources(pod *v1.Pod) error { continue } - // Is the resource already prepared? Then add the pod UID to it. - if claimInfo := m.cache.get(*claimName, pod.Namespace); claimInfo != nil { - // We delay checkpointing of this change until this call - // returns successfully. It is OK to do this because we - // will only return successfully from this call if the - // checkpoint has succeeded. That means if the kubelet is - // ever restarted before this checkpoint succeeds, the pod - // whose resources are being prepared would never have - // started, so it's OK (actually correct) to not include it - // in the cache. - claimInfo.addPodReference(pod.UID) + claimInfo := m.cache.get(*claimName, pod.Namespace) + if claimInfo == nil { + // claim does not exist in cache, create new claimInfo object + // to be processed later. + claimInfo = newClaimInfoFromResourceClaim(resourceClaim) + } + + // We delay checkpointing of this change until this call + // returns successfully. It is OK to do this because we + // will only return successfully from this call if the + // checkpoint has succeeded. That means if the kubelet is + // ever restarted before this checkpoint succeeds, the pod + // whose resources are being prepared would never have + // started, so it's OK (actually correct) to not include it + // in the cache. + claimInfo.addPodReference(pod.UID) + + if claimInfo.prepared { + // Already prepared this claim, no need to prepare it again continue } - // Grab the allocation.resourceHandles. If there are no - // allocation.resourceHandles, create a single resourceHandle with no - // content. This will trigger processing of this claim by a single - // kubelet plugin whose name matches resourceClaim.Status.DriverName. - resourceHandles := resourceClaim.Status.Allocation.ResourceHandles - if len(resourceHandles) == 0 { - resourceHandles = make([]resourcev1alpha2.ResourceHandle, 1) - } - - // Create a claimInfo object to store the relevant claim info. - claimInfo := newClaimInfo( - resourceClaim.Status.DriverName, - resourceClaim.Spec.ResourceClassName, - resourceClaim.UID, - resourceClaim.Name, - resourceClaim.Namespace, - sets.New(string(pod.UID)), - resourceHandles, - ) - // Loop through all plugins and prepare for calling NodePrepareResources. - for _, resourceHandle := range resourceHandles { + for _, resourceHandle := range claimInfo.ResourceHandles { // If no DriverName is provided in the resourceHandle, we // use the DriverName from the status pluginName := resourceHandle.DriverName @@ -193,6 +179,8 @@ func (m *ManagerImpl) PrepareResources(pod *v1.Pod) error { if err != nil { return fmt.Errorf("failed to add CDIDevices to claimInfo %+v: %+v", claimInfo, err) } + // mark claim as (successfully) prepared by manager, so next time we dont prepare it. + claimInfo.prepared = true // TODO: We (re)add the claimInfo object to the cache and // sync it to the checkpoint *after* the @@ -291,8 +279,9 @@ func (m *ManagerImpl) GetResources(pod *v1.Pod, container *v1.Container) (*Conta } claimInfo.RLock() - klog.V(3).InfoS("Add resource annotations", "claim", *claimName, "annotations", claimInfo.annotations) - annotations = append(annotations, claimInfo.annotations...) + claimAnnotations := claimInfo.annotationsAsList() + klog.V(3).InfoS("Add resource annotations", "claim", *claimName, "annotations", claimAnnotations) + annotations = append(annotations, claimAnnotations...) for _, devices := range claimInfo.CDIDevices { for _, device := range devices { cdiDevices = append(cdiDevices, kubecontainer.CDIDevice{Name: device}) diff --git a/pkg/kubelet/cm/dra/manager_test.go b/pkg/kubelet/cm/dra/manager_test.go index dacde727bf2..1e90a29693f 100644 --- a/pkg/kubelet/cm/dra/manager_test.go +++ b/pkg/kubelet/cm/dra/manager_test.go @@ -22,6 +22,7 @@ import ( "net" "os" "path/filepath" + "sync/atomic" "testing" "time" @@ -46,11 +47,15 @@ const ( type fakeDRADriverGRPCServer struct { drapbv1.UnimplementedNodeServer - driverName string - timeout *time.Duration + driverName string + timeout *time.Duration + prepareResourceCalls atomic.Uint32 + unprepareResourceCalls atomic.Uint32 } func (s *fakeDRADriverGRPCServer) NodePrepareResources(ctx context.Context, req *drapbv1.NodePrepareResourcesRequest) (*drapbv1.NodePrepareResourcesResponse, error) { + s.prepareResourceCalls.Add(1) + if s.timeout != nil { time.Sleep(*s.timeout) } @@ -60,6 +65,8 @@ func (s *fakeDRADriverGRPCServer) NodePrepareResources(ctx context.Context, req } func (s *fakeDRADriverGRPCServer) NodeUnprepareResources(ctx context.Context, req *drapbv1.NodeUnprepareResourcesRequest) (*drapbv1.NodeUnprepareResourcesResponse, error) { + s.unprepareResourceCalls.Add(1) + if s.timeout != nil { time.Sleep(*s.timeout) } @@ -68,10 +75,23 @@ func (s *fakeDRADriverGRPCServer) NodeUnprepareResources(ctx context.Context, re type tearDown func() -func setupFakeDRADriverGRPCServer(shouldTimeout bool) (string, tearDown, error) { +type fakeDRAServerInfo struct { + // fake DRA server + server *fakeDRADriverGRPCServer + // fake DRA plugin socket name + socketName string + // teardownFn stops fake gRPC server + teardownFn tearDown +} + +func setupFakeDRADriverGRPCServer(shouldTimeout bool) (fakeDRAServerInfo, error) { socketDir, err := os.MkdirTemp("", "dra") if err != nil { - return "", nil, err + return fakeDRAServerInfo{ + server: nil, + socketName: "", + teardownFn: nil, + }, err } socketName := filepath.Join(socketDir, "server.sock") @@ -85,7 +105,11 @@ func setupFakeDRADriverGRPCServer(shouldTimeout bool) (string, tearDown, error) l, err := net.Listen("unix", socketName) if err != nil { teardown() - return "", nil, err + return fakeDRAServerInfo{ + server: nil, + socketName: "", + teardownFn: nil, + }, err } s := grpc.NewServer() @@ -105,7 +129,11 @@ func setupFakeDRADriverGRPCServer(shouldTimeout bool) (string, tearDown, error) s.GracefulStop() }() - return socketName, teardown, nil + return fakeDRAServerInfo{ + server: fakeDRADriverGRPCServer, + socketName: socketName, + teardownFn: teardown, + }, nil } func TestNewManagerImpl(t *testing.T) { @@ -177,10 +205,12 @@ func TestGetResources(t *testing.T) { }, }, claimInfo: &ClaimInfo{ - annotations: []kubecontainer.Annotation{ - { - Name: "test-annotation", - Value: "123", + annotations: map[string][]kubecontainer.Annotation{ + "test-plugin": { + { + Name: "test-annotation", + Value: "123", + }, }, }, ClaimInfoState: state.ClaimInfoState{ @@ -280,14 +310,15 @@ func TestPrepareResources(t *testing.T) { fakeKubeClient := fake.NewSimpleClientset() for _, test := range []struct { - description string - driverName string - pod *v1.Pod - claimInfo *ClaimInfo - resourceClaim *resourcev1alpha2.ResourceClaim - wantErr bool - wantTimeout bool - wantResourceSkipped bool + description string + driverName string + pod *v1.Pod + claimInfo *ClaimInfo + resourceClaim *resourcev1alpha2.ResourceClaim + wantErr bool + wantTimeout bool + wantResourceSkipped bool + ExpectedPrepareCalls uint32 }{ { description: "failed to fetch ResourceClaim", @@ -497,6 +528,7 @@ func TestPrepareResources(t *testing.T) { Namespace: "test-namespace", PodUIDs: sets.Set[string]{"test-another-pod-reserved": sets.Empty{}}, }, + prepared: true, }, resourceClaim: &resourcev1alpha2.ResourceClaim{ ObjectMeta: metav1.ObjectMeta{ @@ -574,11 +606,12 @@ func TestPrepareResources(t *testing.T) { }, }, }, - wantErr: true, - wantTimeout: true, + wantErr: true, + wantTimeout: true, + ExpectedPrepareCalls: 1, }, { - description: "should prepare resource", + description: "should prepare resource, claim not in cache", driverName: driverName, pod: &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ @@ -630,6 +663,78 @@ func TestPrepareResources(t *testing.T) { }, }, }, + ExpectedPrepareCalls: 1, + }, + { + description: "should prepare resource. claim in cache, manager did not prepare resource", + driverName: driverName, + pod: &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod", + Namespace: "test-namespace", + UID: "test-reserved", + }, + Spec: v1.PodSpec{ + ResourceClaims: []v1.PodResourceClaim{ + { + Name: "test-pod-claim", + Source: v1.ClaimSource{ResourceClaimName: func() *string { + s := "test-pod-claim" + return &s + }()}, + }, + }, + Containers: []v1.Container{ + { + Resources: v1.ResourceRequirements{ + Claims: []v1.ResourceClaim{ + { + Name: "test-pod-claim", + }, + }, + }, + }, + }, + }, + }, + claimInfo: &ClaimInfo{ + ClaimInfoState: state.ClaimInfoState{ + DriverName: driverName, + ClassName: "test-class", + ClaimName: "test-pod-claim", + ClaimUID: "test-reserved", + Namespace: "test-namespace", + PodUIDs: sets.Set[string]{"test-reserved": sets.Empty{}}, + CDIDevices: map[string][]string{ + driverName: {fmt.Sprintf("%s/%s=some-device", driverName, driverClassName)}, + }, + ResourceHandles: []resourcev1alpha2.ResourceHandle{{Data: "test-data"}}, + }, + annotations: make(map[string][]kubecontainer.Annotation), + prepared: false, + }, + resourceClaim: &resourcev1alpha2.ResourceClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod-claim", + Namespace: "test-namespace", + UID: "test-reserved", + }, + Spec: resourcev1alpha2.ResourceClaimSpec{ + ResourceClassName: "test-class", + }, + Status: resourcev1alpha2.ResourceClaimStatus{ + DriverName: driverName, + Allocation: &resourcev1alpha2.AllocationResult{ + ResourceHandles: []resourcev1alpha2.ResourceHandle{ + {Data: "test-data"}, + }, + }, + ReservedFor: []resourcev1alpha2.ResourceClaimConsumerReference{ + {UID: "test-reserved"}, + }, + }, + }, + ExpectedPrepareCalls: 1, }, } { t.Run(test.description, func(t *testing.T) { @@ -649,14 +754,14 @@ func TestPrepareResources(t *testing.T) { } } - socketName, teardown, err := setupFakeDRADriverGRPCServer(test.wantTimeout) + draServerInfo, err := setupFakeDRADriverGRPCServer(test.wantTimeout) if err != nil { t.Fatal(err) } - defer teardown() + defer draServerInfo.teardownFn() plg := plugin.NewRegistrationHandler() - if err := plg.RegisterPlugin(test.driverName, socketName, []string{"1.27"}); err != nil { + if err := plg.RegisterPlugin(test.driverName, draServerInfo.socketName, []string{"1.27"}); err != nil { t.Fatalf("failed to register plugin %s, err: %v", test.driverName, err) } defer plg.DeRegisterPlugin(test.driverName) // for sake of next tests @@ -666,6 +771,9 @@ func TestPrepareResources(t *testing.T) { } err = manager.PrepareResources(test.pod) + + assert.Equal(t, test.ExpectedPrepareCalls, draServerInfo.server.prepareResourceCalls.Load()) + if test.wantErr { assert.Error(t, err) return // PrepareResources returned an error so stopping the subtest here @@ -705,13 +813,14 @@ func TestUnprepareResources(t *testing.T) { fakeKubeClient := fake.NewSimpleClientset() for _, test := range []struct { - description string - driverName string - pod *v1.Pod - claimInfo *ClaimInfo - wantErr bool - wantTimeout bool - wantResourceSkipped bool + description string + driverName string + pod *v1.Pod + claimInfo *ClaimInfo + wantErr bool + wantTimeout bool + wantResourceSkipped bool + expectedUnprepareCalls uint32 }{ { description: "plugin does not exist", @@ -838,11 +947,12 @@ func TestUnprepareResources(t *testing.T) { }, }, }, - wantErr: true, - wantTimeout: true, + wantErr: true, + wantTimeout: true, + expectedUnprepareCalls: 1, }, { - description: "should unprepare resource", + description: "should unprepare resource, claim previously prepared by currently running manager", driverName: driverName, pod: &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ @@ -885,7 +995,57 @@ func TestUnprepareResources(t *testing.T) { }, }, }, + prepared: true, }, + expectedUnprepareCalls: 1, + }, + { + description: "should unprepare resource, claim previously was not prepared by currently running manager", + driverName: driverName, + pod: &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod", + Namespace: "test-namespace", + UID: "test-reserved", + }, + Spec: v1.PodSpec{ + ResourceClaims: []v1.PodResourceClaim{ + { + Name: "test-pod-claim", + Source: v1.ClaimSource{ResourceClaimName: func() *string { + s := "test-pod-claim" + return &s + }()}, + }, + }, + Containers: []v1.Container{ + { + Resources: v1.ResourceRequirements{ + Claims: []v1.ResourceClaim{ + { + Name: "test-pod-claim", + }, + }, + }, + }, + }, + }, + }, + claimInfo: &ClaimInfo{ + ClaimInfoState: state.ClaimInfoState{ + DriverName: driverName, + ClaimName: "test-pod-claim", + Namespace: "test-namespace", + ResourceHandles: []resourcev1alpha2.ResourceHandle{ + { + DriverName: driverName, + Data: "test data", + }, + }, + }, + prepared: false, + }, + expectedUnprepareCalls: 1, }, } { t.Run(test.description, func(t *testing.T) { @@ -894,14 +1054,14 @@ func TestUnprepareResources(t *testing.T) { t.Fatalf("failed to create a new instance of the claimInfoCache, err: %v", err) } - socketName, teardown, err := setupFakeDRADriverGRPCServer(test.wantTimeout) + draServerInfo, err := setupFakeDRADriverGRPCServer(test.wantTimeout) if err != nil { t.Fatal(err) } - defer teardown() + defer draServerInfo.teardownFn() plg := plugin.NewRegistrationHandler() - if err := plg.RegisterPlugin(test.driverName, socketName, []string{"1.27"}); err != nil { + if err := plg.RegisterPlugin(test.driverName, draServerInfo.socketName, []string{"1.27"}); err != nil { t.Fatalf("failed to register plugin %s, err: %v", test.driverName, err) } defer plg.DeRegisterPlugin(test.driverName) // for sake of next tests @@ -916,6 +1076,9 @@ func TestUnprepareResources(t *testing.T) { } err = manager.UnprepareResources(test.pod) + + assert.Equal(t, test.expectedUnprepareCalls, draServerInfo.server.unprepareResourceCalls.Load()) + if test.wantErr { assert.Error(t, err) return // UnprepareResources returned an error so stopping the subtest here