From 08b942028f9ce3d9803bb690d24ad2ac0c634af1 Mon Sep 17 00:00:00 2001 From: adrianc Date: Sun, 10 Sep 2023 17:12:31 +0300 Subject: [PATCH 1/2] DRA: call plugins for claims even if exist in cache Today, DRA manager does not call plugin NodePrepareResource for claims that it previously successfully handled, that is, if claims are present in cache (checkpoint) even if node rebooted. After node reboots, it is required to call DRA plugin for resource claims so that plugins may prepare them again in case the resources dont persist reboot. To achieve that, once kubelet is started, we call DRA plugins for claims once if a pod sandbox is required to be created during PodSync. Signed-off-by: adrianc --- pkg/kubelet/cm/dra/claiminfo.go | 42 +++++++++++++++++++++-- pkg/kubelet/cm/dra/manager.go | 61 ++++++++++++++------------------- 2 files changed, 64 insertions(+), 39 deletions(-) diff --git a/pkg/kubelet/cm/dra/claiminfo.go b/pkg/kubelet/cm/dra/claiminfo.go index 7266f9e72b2..d369b8d3e33 100644 --- a/pkg/kubelet/cm/dra/claiminfo.go +++ b/pkg/kubelet/cm/dra/claiminfo.go @@ -33,9 +33,10 @@ import ( type ClaimInfo struct { sync.RWMutex state.ClaimInfoState - // annotations is a list of container annotations associated with + // annotations is a mapping of container annotations per DRA plugin associated with // a prepared resource - annotations []kubecontainer.Annotation + annotations map[string][]kubecontainer.Annotation + prepared bool } func (info *ClaimInfo) addPodReference(podUID types.UID) { @@ -69,11 +70,23 @@ func (info *ClaimInfo) addCDIDevices(pluginName string, cdiDevices []string) err } info.CDIDevices[pluginName] = cdiDevices - info.annotations = append(info.annotations, annotations...) + info.annotations[pluginName] = annotations return nil } +// annotationsAsList returns container annotations as a single list. +func (info *ClaimInfo) annotationsAsList() []kubecontainer.Annotation { + info.RLock() + defer info.RUnlock() + + var lst []kubecontainer.Annotation + for _, v := range info.annotations { + lst = append(lst, v...) + } + return lst +} + // claimInfoCache is a cache of processed resource claims keyed by namespace + claim name. type claimInfoCache struct { sync.RWMutex @@ -93,10 +106,33 @@ func newClaimInfo(driverName, className string, claimUID types.UID, claimName, n } claimInfo := ClaimInfo{ ClaimInfoState: claimInfoState, + annotations: make(map[string][]kubecontainer.Annotation), } return &claimInfo } +// newClaimInfoFromResourceClaim creates a new ClaimInfo object +func newClaimInfoFromResourceClaim(resourceClaim *resourcev1alpha2.ResourceClaim) *ClaimInfo { + // Grab the allocation.resourceHandles. If there are no + // allocation.resourceHandles, create a single resourceHandle with no + // content. This will trigger processing of this claim by a single + // kubelet plugin whose name matches resourceClaim.Status.DriverName. + resourceHandles := resourceClaim.Status.Allocation.ResourceHandles + if len(resourceHandles) == 0 { + resourceHandles = make([]resourcev1alpha2.ResourceHandle, 1) + } + + return newClaimInfo( + resourceClaim.Status.DriverName, + resourceClaim.Spec.ResourceClassName, + resourceClaim.UID, + resourceClaim.Name, + resourceClaim.Namespace, + make(sets.Set[string]), + resourceHandles, + ) +} + // newClaimInfoCache is a function that returns an instance of the claimInfoCache. func newClaimInfoCache(stateDir, checkpointName string) (*claimInfoCache, error) { stateImpl, err := state.NewCheckpointState(stateDir, checkpointName) diff --git a/pkg/kubelet/cm/dra/manager.go b/pkg/kubelet/cm/dra/manager.go index 703eae58b4f..62a2bd4cd4f 100644 --- a/pkg/kubelet/cm/dra/manager.go +++ b/pkg/kubelet/cm/dra/manager.go @@ -21,10 +21,8 @@ import ( "fmt" v1 "k8s.io/api/core/v1" - resourcev1alpha2 "k8s.io/api/resource/v1alpha2" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/sets" clientset "k8s.io/client-go/kubernetes" "k8s.io/dynamic-resource-allocation/resourceclaim" "k8s.io/klog/v2" @@ -109,42 +107,30 @@ func (m *ManagerImpl) PrepareResources(pod *v1.Pod) error { continue } - // Is the resource already prepared? Then add the pod UID to it. - if claimInfo := m.cache.get(*claimName, pod.Namespace); claimInfo != nil { - // We delay checkpointing of this change until this call - // returns successfully. It is OK to do this because we - // will only return successfully from this call if the - // checkpoint has succeeded. That means if the kubelet is - // ever restarted before this checkpoint succeeds, the pod - // whose resources are being prepared would never have - // started, so it's OK (actually correct) to not include it - // in the cache. - claimInfo.addPodReference(pod.UID) + claimInfo := m.cache.get(*claimName, pod.Namespace) + if claimInfo == nil { + // claim does not exist in cache, create new claimInfo object + // to be processed later. + claimInfo = newClaimInfoFromResourceClaim(resourceClaim) + } + + // We delay checkpointing of this change until this call + // returns successfully. It is OK to do this because we + // will only return successfully from this call if the + // checkpoint has succeeded. That means if the kubelet is + // ever restarted before this checkpoint succeeds, the pod + // whose resources are being prepared would never have + // started, so it's OK (actually correct) to not include it + // in the cache. + claimInfo.addPodReference(pod.UID) + + if claimInfo.prepared { + // Already prepared this claim, no need to prepare it again continue } - // Grab the allocation.resourceHandles. If there are no - // allocation.resourceHandles, create a single resourceHandle with no - // content. This will trigger processing of this claim by a single - // kubelet plugin whose name matches resourceClaim.Status.DriverName. - resourceHandles := resourceClaim.Status.Allocation.ResourceHandles - if len(resourceHandles) == 0 { - resourceHandles = make([]resourcev1alpha2.ResourceHandle, 1) - } - - // Create a claimInfo object to store the relevant claim info. - claimInfo := newClaimInfo( - resourceClaim.Status.DriverName, - resourceClaim.Spec.ResourceClassName, - resourceClaim.UID, - resourceClaim.Name, - resourceClaim.Namespace, - sets.New(string(pod.UID)), - resourceHandles, - ) - // Loop through all plugins and prepare for calling NodePrepareResources. - for _, resourceHandle := range resourceHandles { + for _, resourceHandle := range claimInfo.ResourceHandles { // If no DriverName is provided in the resourceHandle, we // use the DriverName from the status pluginName := resourceHandle.DriverName @@ -193,6 +179,8 @@ func (m *ManagerImpl) PrepareResources(pod *v1.Pod) error { if err != nil { return fmt.Errorf("failed to add CDIDevices to claimInfo %+v: %+v", claimInfo, err) } + // mark claim as (successfully) prepared by manager, so next time we dont prepare it. + claimInfo.prepared = true // TODO: We (re)add the claimInfo object to the cache and // sync it to the checkpoint *after* the @@ -291,8 +279,9 @@ func (m *ManagerImpl) GetResources(pod *v1.Pod, container *v1.Container) (*Conta } claimInfo.RLock() - klog.V(3).InfoS("Add resource annotations", "claim", *claimName, "annotations", claimInfo.annotations) - annotations = append(annotations, claimInfo.annotations...) + claimAnnotations := claimInfo.annotationsAsList() + klog.V(3).InfoS("Add resource annotations", "claim", *claimName, "annotations", claimAnnotations) + annotations = append(annotations, claimAnnotations...) for _, devices := range claimInfo.CDIDevices { for _, device := range devices { cdiDevices = append(cdiDevices, kubecontainer.CDIDevice{Name: device}) From 3738111337e720ccf57fc7a4273db26c3ca0e947 Mon Sep 17 00:00:00 2001 From: adrianc Date: Mon, 9 Oct 2023 16:18:59 +0300 Subject: [PATCH 2/2] Add unit tests adjust existing tests and add new test flows to cover new DRA manager behaviour Signed-off-by: adrianc --- pkg/kubelet/cm/dra/manager_test.go | 237 ++++++++++++++++++++++++----- 1 file changed, 200 insertions(+), 37 deletions(-) diff --git a/pkg/kubelet/cm/dra/manager_test.go b/pkg/kubelet/cm/dra/manager_test.go index dacde727bf2..1e90a29693f 100644 --- a/pkg/kubelet/cm/dra/manager_test.go +++ b/pkg/kubelet/cm/dra/manager_test.go @@ -22,6 +22,7 @@ import ( "net" "os" "path/filepath" + "sync/atomic" "testing" "time" @@ -46,11 +47,15 @@ const ( type fakeDRADriverGRPCServer struct { drapbv1.UnimplementedNodeServer - driverName string - timeout *time.Duration + driverName string + timeout *time.Duration + prepareResourceCalls atomic.Uint32 + unprepareResourceCalls atomic.Uint32 } func (s *fakeDRADriverGRPCServer) NodePrepareResources(ctx context.Context, req *drapbv1.NodePrepareResourcesRequest) (*drapbv1.NodePrepareResourcesResponse, error) { + s.prepareResourceCalls.Add(1) + if s.timeout != nil { time.Sleep(*s.timeout) } @@ -60,6 +65,8 @@ func (s *fakeDRADriverGRPCServer) NodePrepareResources(ctx context.Context, req } func (s *fakeDRADriverGRPCServer) NodeUnprepareResources(ctx context.Context, req *drapbv1.NodeUnprepareResourcesRequest) (*drapbv1.NodeUnprepareResourcesResponse, error) { + s.unprepareResourceCalls.Add(1) + if s.timeout != nil { time.Sleep(*s.timeout) } @@ -68,10 +75,23 @@ func (s *fakeDRADriverGRPCServer) NodeUnprepareResources(ctx context.Context, re type tearDown func() -func setupFakeDRADriverGRPCServer(shouldTimeout bool) (string, tearDown, error) { +type fakeDRAServerInfo struct { + // fake DRA server + server *fakeDRADriverGRPCServer + // fake DRA plugin socket name + socketName string + // teardownFn stops fake gRPC server + teardownFn tearDown +} + +func setupFakeDRADriverGRPCServer(shouldTimeout bool) (fakeDRAServerInfo, error) { socketDir, err := os.MkdirTemp("", "dra") if err != nil { - return "", nil, err + return fakeDRAServerInfo{ + server: nil, + socketName: "", + teardownFn: nil, + }, err } socketName := filepath.Join(socketDir, "server.sock") @@ -85,7 +105,11 @@ func setupFakeDRADriverGRPCServer(shouldTimeout bool) (string, tearDown, error) l, err := net.Listen("unix", socketName) if err != nil { teardown() - return "", nil, err + return fakeDRAServerInfo{ + server: nil, + socketName: "", + teardownFn: nil, + }, err } s := grpc.NewServer() @@ -105,7 +129,11 @@ func setupFakeDRADriverGRPCServer(shouldTimeout bool) (string, tearDown, error) s.GracefulStop() }() - return socketName, teardown, nil + return fakeDRAServerInfo{ + server: fakeDRADriverGRPCServer, + socketName: socketName, + teardownFn: teardown, + }, nil } func TestNewManagerImpl(t *testing.T) { @@ -177,10 +205,12 @@ func TestGetResources(t *testing.T) { }, }, claimInfo: &ClaimInfo{ - annotations: []kubecontainer.Annotation{ - { - Name: "test-annotation", - Value: "123", + annotations: map[string][]kubecontainer.Annotation{ + "test-plugin": { + { + Name: "test-annotation", + Value: "123", + }, }, }, ClaimInfoState: state.ClaimInfoState{ @@ -280,14 +310,15 @@ func TestPrepareResources(t *testing.T) { fakeKubeClient := fake.NewSimpleClientset() for _, test := range []struct { - description string - driverName string - pod *v1.Pod - claimInfo *ClaimInfo - resourceClaim *resourcev1alpha2.ResourceClaim - wantErr bool - wantTimeout bool - wantResourceSkipped bool + description string + driverName string + pod *v1.Pod + claimInfo *ClaimInfo + resourceClaim *resourcev1alpha2.ResourceClaim + wantErr bool + wantTimeout bool + wantResourceSkipped bool + ExpectedPrepareCalls uint32 }{ { description: "failed to fetch ResourceClaim", @@ -497,6 +528,7 @@ func TestPrepareResources(t *testing.T) { Namespace: "test-namespace", PodUIDs: sets.Set[string]{"test-another-pod-reserved": sets.Empty{}}, }, + prepared: true, }, resourceClaim: &resourcev1alpha2.ResourceClaim{ ObjectMeta: metav1.ObjectMeta{ @@ -574,11 +606,12 @@ func TestPrepareResources(t *testing.T) { }, }, }, - wantErr: true, - wantTimeout: true, + wantErr: true, + wantTimeout: true, + ExpectedPrepareCalls: 1, }, { - description: "should prepare resource", + description: "should prepare resource, claim not in cache", driverName: driverName, pod: &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ @@ -630,6 +663,78 @@ func TestPrepareResources(t *testing.T) { }, }, }, + ExpectedPrepareCalls: 1, + }, + { + description: "should prepare resource. claim in cache, manager did not prepare resource", + driverName: driverName, + pod: &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod", + Namespace: "test-namespace", + UID: "test-reserved", + }, + Spec: v1.PodSpec{ + ResourceClaims: []v1.PodResourceClaim{ + { + Name: "test-pod-claim", + Source: v1.ClaimSource{ResourceClaimName: func() *string { + s := "test-pod-claim" + return &s + }()}, + }, + }, + Containers: []v1.Container{ + { + Resources: v1.ResourceRequirements{ + Claims: []v1.ResourceClaim{ + { + Name: "test-pod-claim", + }, + }, + }, + }, + }, + }, + }, + claimInfo: &ClaimInfo{ + ClaimInfoState: state.ClaimInfoState{ + DriverName: driverName, + ClassName: "test-class", + ClaimName: "test-pod-claim", + ClaimUID: "test-reserved", + Namespace: "test-namespace", + PodUIDs: sets.Set[string]{"test-reserved": sets.Empty{}}, + CDIDevices: map[string][]string{ + driverName: {fmt.Sprintf("%s/%s=some-device", driverName, driverClassName)}, + }, + ResourceHandles: []resourcev1alpha2.ResourceHandle{{Data: "test-data"}}, + }, + annotations: make(map[string][]kubecontainer.Annotation), + prepared: false, + }, + resourceClaim: &resourcev1alpha2.ResourceClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod-claim", + Namespace: "test-namespace", + UID: "test-reserved", + }, + Spec: resourcev1alpha2.ResourceClaimSpec{ + ResourceClassName: "test-class", + }, + Status: resourcev1alpha2.ResourceClaimStatus{ + DriverName: driverName, + Allocation: &resourcev1alpha2.AllocationResult{ + ResourceHandles: []resourcev1alpha2.ResourceHandle{ + {Data: "test-data"}, + }, + }, + ReservedFor: []resourcev1alpha2.ResourceClaimConsumerReference{ + {UID: "test-reserved"}, + }, + }, + }, + ExpectedPrepareCalls: 1, }, } { t.Run(test.description, func(t *testing.T) { @@ -649,14 +754,14 @@ func TestPrepareResources(t *testing.T) { } } - socketName, teardown, err := setupFakeDRADriverGRPCServer(test.wantTimeout) + draServerInfo, err := setupFakeDRADriverGRPCServer(test.wantTimeout) if err != nil { t.Fatal(err) } - defer teardown() + defer draServerInfo.teardownFn() plg := plugin.NewRegistrationHandler() - if err := plg.RegisterPlugin(test.driverName, socketName, []string{"1.27"}); err != nil { + if err := plg.RegisterPlugin(test.driverName, draServerInfo.socketName, []string{"1.27"}); err != nil { t.Fatalf("failed to register plugin %s, err: %v", test.driverName, err) } defer plg.DeRegisterPlugin(test.driverName) // for sake of next tests @@ -666,6 +771,9 @@ func TestPrepareResources(t *testing.T) { } err = manager.PrepareResources(test.pod) + + assert.Equal(t, test.ExpectedPrepareCalls, draServerInfo.server.prepareResourceCalls.Load()) + if test.wantErr { assert.Error(t, err) return // PrepareResources returned an error so stopping the subtest here @@ -705,13 +813,14 @@ func TestUnprepareResources(t *testing.T) { fakeKubeClient := fake.NewSimpleClientset() for _, test := range []struct { - description string - driverName string - pod *v1.Pod - claimInfo *ClaimInfo - wantErr bool - wantTimeout bool - wantResourceSkipped bool + description string + driverName string + pod *v1.Pod + claimInfo *ClaimInfo + wantErr bool + wantTimeout bool + wantResourceSkipped bool + expectedUnprepareCalls uint32 }{ { description: "plugin does not exist", @@ -838,11 +947,12 @@ func TestUnprepareResources(t *testing.T) { }, }, }, - wantErr: true, - wantTimeout: true, + wantErr: true, + wantTimeout: true, + expectedUnprepareCalls: 1, }, { - description: "should unprepare resource", + description: "should unprepare resource, claim previously prepared by currently running manager", driverName: driverName, pod: &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ @@ -885,7 +995,57 @@ func TestUnprepareResources(t *testing.T) { }, }, }, + prepared: true, }, + expectedUnprepareCalls: 1, + }, + { + description: "should unprepare resource, claim previously was not prepared by currently running manager", + driverName: driverName, + pod: &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod", + Namespace: "test-namespace", + UID: "test-reserved", + }, + Spec: v1.PodSpec{ + ResourceClaims: []v1.PodResourceClaim{ + { + Name: "test-pod-claim", + Source: v1.ClaimSource{ResourceClaimName: func() *string { + s := "test-pod-claim" + return &s + }()}, + }, + }, + Containers: []v1.Container{ + { + Resources: v1.ResourceRequirements{ + Claims: []v1.ResourceClaim{ + { + Name: "test-pod-claim", + }, + }, + }, + }, + }, + }, + }, + claimInfo: &ClaimInfo{ + ClaimInfoState: state.ClaimInfoState{ + DriverName: driverName, + ClaimName: "test-pod-claim", + Namespace: "test-namespace", + ResourceHandles: []resourcev1alpha2.ResourceHandle{ + { + DriverName: driverName, + Data: "test data", + }, + }, + }, + prepared: false, + }, + expectedUnprepareCalls: 1, }, } { t.Run(test.description, func(t *testing.T) { @@ -894,14 +1054,14 @@ func TestUnprepareResources(t *testing.T) { t.Fatalf("failed to create a new instance of the claimInfoCache, err: %v", err) } - socketName, teardown, err := setupFakeDRADriverGRPCServer(test.wantTimeout) + draServerInfo, err := setupFakeDRADriverGRPCServer(test.wantTimeout) if err != nil { t.Fatal(err) } - defer teardown() + defer draServerInfo.teardownFn() plg := plugin.NewRegistrationHandler() - if err := plg.RegisterPlugin(test.driverName, socketName, []string{"1.27"}); err != nil { + if err := plg.RegisterPlugin(test.driverName, draServerInfo.socketName, []string{"1.27"}); err != nil { t.Fatalf("failed to register plugin %s, err: %v", test.driverName, err) } defer plg.DeRegisterPlugin(test.driverName) // for sake of next tests @@ -916,6 +1076,9 @@ func TestUnprepareResources(t *testing.T) { } err = manager.UnprepareResources(test.pod) + + assert.Equal(t, test.expectedUnprepareCalls, draServerInfo.server.unprepareResourceCalls.Load()) + if test.wantErr { assert.Error(t, err) return // UnprepareResources returned an error so stopping the subtest here