diff --git a/pkg/kubelet/cm/dra/claiminfo.go b/pkg/kubelet/cm/dra/claiminfo.go index c230e72d08f..89f1a68c884 100644 --- a/pkg/kubelet/cm/dra/claiminfo.go +++ b/pkg/kubelet/cm/dra/claiminfo.go @@ -57,27 +57,37 @@ type claimInfoCache struct { claimInfo map[string]*claimInfo } -func newClaimInfo(driverName string, claimUID types.UID, claimName, namespace string, podUIDs sets.Set[string], cdiDevice []string) (*claimInfo, error) { +func newClaimInfo(driverName string, claimUID types.UID, claimName, namespace string, podUIDs sets.Set[string]) *claimInfo { claimInfoState := state.ClaimInfoState{ DriverName: driverName, ClaimUID: claimUID, ClaimName: claimName, Namespace: namespace, PodUIDs: podUIDs, - CdiDevices: cdiDevice, - } - // NOTE: Passing CDI device names as annotations is a temporary solution - // It will be removed after all runtimes are updated - // to get CDI device names from the ContainerConfig.CDIDevices field - annotations, err := generateCDIAnnotations(claimUID, driverName, cdiDevice) - if err != nil { - return nil, fmt.Errorf("failed to generate container annotations, err: %+v", err) } claimInfo := claimInfo{ ClaimInfoState: claimInfoState, - annotations: annotations, } - return &claimInfo, nil + return &claimInfo +} + +func (info *claimInfo) addCDIDevices(pluginName string, cdiDevices []string) error { + // NOTE: Passing CDI device names as annotations is a temporary solution + // It will be removed after all runtimes are updated + // to get CDI device names from the ContainerConfig.CDIDevices field + annotations, err := generateCDIAnnotations(info.ClaimUID, info.DriverName, cdiDevices) + if err != nil { + return fmt.Errorf("failed to generate container annotations, err: %+v", err) + } + + if info.CDIDevices == nil { + info.CDIDevices = make(map[string][]string) + } + + info.CDIDevices[pluginName] = cdiDevices + info.annotations = append(info.annotations, annotations...) + + return nil } // newClaimInfoCache is a function that returns an instance of the claimInfoCache. @@ -98,16 +108,18 @@ func newClaimInfoCache(stateDir, checkpointName string) (*claimInfoCache, error) } for _, entry := range curState { - info, err := newClaimInfo( + info := newClaimInfo( entry.DriverName, entry.ClaimUID, entry.ClaimName, entry.Namespace, entry.PodUIDs, - entry.CdiDevices, ) - if err != nil { - return nil, fmt.Errorf("failed to create claimInfo %+v: %+v", info, err) + for pluginName, cdiDevices := range entry.CDIDevices { + err := info.addCDIDevices(pluginName, cdiDevices) + if err != nil { + return nil, fmt.Errorf("failed to add CDIDevices to claimInfo %+v: %+v", info, err) + } } cache.add(info) } diff --git a/pkg/kubelet/cm/dra/manager.go b/pkg/kubelet/cm/dra/manager.go index b3df135d803..69dfa6eac24 100644 --- a/pkg/kubelet/cm/dra/manager.go +++ b/pkg/kubelet/cm/dra/manager.go @@ -74,11 +74,14 @@ func (m *ManagerImpl) PrepareResources(pod *v1.Pod) error { // Resource is already prepared, add pod UID to it if claimInfo := m.cache.get(claimName, pod.Namespace); claimInfo != nil { - // We delay checkpointing of this change until this call returns successfully. - // It is OK to do this because we will only return successfully from this call if - // the checkpoint has succeeded. That means if the kubelet is ever restarted - // before this checkpoint succeeds, the pod whose resources are being prepared - // would never have started, so it's OK (actually correct) to not include it in the cache. + // We delay checkpointing of this change until this call + // returns successfully. It is OK to do this because we + // will only return successfully from this call if the + // checkpoint has succeeded. That means if the kubelet is + // ever restarted before this checkpoint succeeds, the pod + // whose resources are being prepared would never have + // started, so it's OK (actually correct) to not include it + // in the cache. claimInfo.addPodReference(pod.UID) continue } @@ -98,48 +101,76 @@ func (m *ManagerImpl) PrepareResources(pod *v1.Pod) error { pod.Name, pod.UID, podResourceClaim.Name, resourceClaim.UID) } - // Call NodePrepareResource RPC - driverName := resourceClaim.Status.DriverName - - client, err := dra.NewDRAPluginClient(driverName) - if err != nil { - return fmt.Errorf("failed to get DRA Plugin client for plugin name %s, err=%+v", driverName, err) + // Build a slice of "resourceHandles" to group the name of the + // kubelet plugin to call NodePrepareResources() on and the + // ResourceHandle data to be processed by that plugin. For now + // this slice will only have a single entry, where the name of + // the kubelet plugin matches the DriverName. In the future we + // plan to allow each claim to be processed by multiple plugins + // (each with their own ResourceHandle) so this code is being + // written in a way to accommodate this. + resourceHandles := []struct { + KubeletPluginName string + Data string + }{ + { + KubeletPluginName: resourceClaim.Status.DriverName, + Data: resourceClaim.Status.Allocation.ResourceHandle, + }, } - response, err := client.NodePrepareResource( - context.Background(), - resourceClaim.Namespace, - resourceClaim.UID, - resourceClaim.Name, - resourceClaim.Status.Allocation.ResourceHandle) - if err != nil { - return fmt.Errorf("NodePrepareResource failed, claim UID: %s, claim name: %s, resource handle: %s, err: %+v", - resourceClaim.UID, resourceClaim.Name, resourceClaim.Status.Allocation.ResourceHandle, err) - } - - klog.V(3).InfoS("NodePrepareResource succeeded", "response", response) - - // TODO: We are adding the claimInfo struct to the cache and syncing it to the checkpoint *after* the NodePrepareResource - // call has completed. This will cause issues if the kubelet gets restarted between NodePrepareResource and syncToCheckpoint. - // It will result in not calling NodeUnprepareResource for this claim because no claimInfo will be synced back to the cache - // for it after the restart. We need to resolve this issue before moving to beta. - claimInfo, err := newClaimInfo( - driverName, + // Create a claimInfo object to store the relevant claim info. + claimInfo := newClaimInfo( + resourceClaim.Status.DriverName, resourceClaim.UID, resourceClaim.Name, resourceClaim.Namespace, sets.New(string(pod.UID)), - response.CdiDevices) - if err != nil { - return fmt.Errorf("newClaimInfo failed, claim UID: %s, claim name: %s, claim namespace: %s, err: %+v", - resourceClaim.UID, resourceClaim.Name, resourceClaim.Namespace, err) - } + ) - m.cache.add(claimInfo) - // Checkpoint to reduce redundant calls to NodePrepareResource() after a kubelet restart. - err = m.cache.syncToCheckpoint() - if err != nil { - return fmt.Errorf("failed to checkpoint claimInfo state, err: %+v", err) + // Walk through each resourceHandle + for _, resourceHandle := range resourceHandles { + // Call NodePrepareResource RPC for each resourceHandle + client, err := dra.NewDRAPluginClient(resourceHandle.KubeletPluginName) + if err != nil { + return fmt.Errorf("failed to get DRA Plugin client for plugin name %s, err=%+v", resourceHandle.KubeletPluginName, err) + } + response, err := client.NodePrepareResource( + context.Background(), + resourceClaim.Namespace, + resourceClaim.UID, + resourceClaim.Name, + resourceHandle.Data) + if err != nil { + return fmt.Errorf("NodePrepareResource failed, claim UID: %s, claim name: %s, resource handle: %s, err: %+v", + resourceClaim.UID, resourceClaim.Name, resourceHandle.Data, err) + } + klog.V(3).InfoS("NodePrepareResource succeeded", "pluginName", resourceHandle.KubeletPluginName, "response", response) + + // Add the CDI Devices returned by NodePrepareResource to + // the claimInfo object. + err = claimInfo.addCDIDevices(resourceHandle.KubeletPluginName, response.CdiDevices) + if err != nil { + return fmt.Errorf("failed to add CDIDevices to claimInfo %+v: %+v", claimInfo, err) + } + + // TODO: We (re)add the claimInfo object to the cache and + // sync it to the checkpoint *after* the + // NodePrepareResource call has completed. This will cause + // issues if the kubelet gets restarted between + // NodePrepareResource and syncToCheckpoint. It will result + // in not calling NodeUnprepareResource for this claim + // because no claimInfo will be synced back to the cache + // for it after the restart. We need to resolve this issue + // before moving to beta. + m.cache.add(claimInfo) + + // Checkpoint to reduce redundant calls to + // NodePrepareResource() after a kubelet restart. + err = m.cache.syncToCheckpoint() + if err != nil { + return fmt.Errorf("failed to checkpoint claimInfo state, err: %+v", err) + } } } } @@ -173,8 +204,10 @@ func (m *ManagerImpl) GetResources(pod *v1.Pod, container *v1.Container) (*Conta klog.V(3).InfoS("Add resource annotations", "claim", claimName, "annotations", claimInfo.annotations) annotations = append(annotations, claimInfo.annotations...) - for _, cdiDevice := range claimInfo.CdiDevices { - cdiDevices = append(cdiDevices, kubecontainer.CDIDevice{Name: cdiDevice}) + for _, devices := range claimInfo.CDIDevices { + for _, device := range devices { + cdiDevices = append(cdiDevices, kubecontainer.CDIDevice{Name: device}) + } } } } @@ -208,35 +241,36 @@ func (m *ManagerImpl) UnprepareResources(pod *v1.Pod) error { continue } - // Call NodeUnprepareResource only for the last pod that references the claim - client, err := dra.NewDRAPluginClient(claimInfo.DriverName) - if err != nil { - return fmt.Errorf("failed to get DRA Plugin client for plugin name %s, err=%+v", claimInfo.DriverName, err) + // Loop through all plugins and call NodeUnprepareResource only for the + // last pod that references the claim + for pluginName, cdiDevices := range claimInfo.CDIDevices { + client, err := dra.NewDRAPluginClient(pluginName) + if err != nil { + return fmt.Errorf("failed to get DRA Plugin client for plugin name %s, err=%+v", claimInfo.DriverName, err) + } + response, err := client.NodeUnprepareResource( + context.Background(), + claimInfo.Namespace, + claimInfo.ClaimUID, + claimInfo.ClaimName, + cdiDevices) + if err != nil { + return fmt.Errorf( + "NodeUnprepareResource failed, pod: %s, claim UID: %s, claim name: %s, CDI devices: %s, err: %+v", + pod.Name, claimInfo.ClaimUID, claimInfo.ClaimName, claimInfo.CDIDevices, err) + } + klog.V(3).InfoS("NodeUnprepareResource succeeded", "response", response) } - response, err := client.NodeUnprepareResource( - context.Background(), - claimInfo.Namespace, - claimInfo.ClaimUID, - claimInfo.ClaimName, - claimInfo.CdiDevices) - if err != nil { - return fmt.Errorf( - "NodeUnprepareResource failed, pod: %s, claim UID: %s, claim name: %s, CDI devices: %s, err: %+v", - pod.Name, claimInfo.ClaimUID, claimInfo.ClaimName, claimInfo.CdiDevices, err) - } - - klog.V(3).InfoS("NodeUnprepareResource succeeded", "response", response) - - // Delete last pod UID only if NodeUnprepareResource call succeeds. - // This ensures that status manager doesn't enter termination status - // for the pod. This logic is implemented in the m.PodMightNeedToUnprepareResources - // and in the claimInfo.hasPodReference. + // Delete last pod UID only if all NodeUnprepareResource calls succeed. + // This ensures that the status manager doesn't enter termination status + // for the pod. This logic is implemented in + // m.PodMightNeedToUnprepareResources and claimInfo.hasPodReference. claimInfo.deletePodReference(pod.UID) - m.cache.delete(claimInfo.ClaimName, pod.Namespace) + // Checkpoint to reduce redundant calls to NodeUnPrepareResource() after a kubelet restart. - err = m.cache.syncToCheckpoint() + err := m.cache.syncToCheckpoint() if err != nil { return fmt.Errorf("failed to checkpoint claimInfo state, err: %+v", err) } diff --git a/pkg/kubelet/cm/dra/state/state_checkpoint.go b/pkg/kubelet/cm/dra/state/state_checkpoint.go index 387c02eef2a..78da262bcb9 100644 --- a/pkg/kubelet/cm/dra/state/state_checkpoint.go +++ b/pkg/kubelet/cm/dra/state/state_checkpoint.go @@ -51,9 +51,9 @@ type ClaimInfoState struct { // PodUIDs is a set of pod UIDs that reference a resource PodUIDs sets.Set[string] - // CdiDevices is a list of CDI devices returned by the + // CDIDevices is a map of KubeletPluginName --> CDI devices returned by the // GRPC API call NodePrepareResource - CdiDevices []string + CDIDevices map[string][]string } type stateCheckpoint struct { diff --git a/pkg/kubelet/cm/dra/state/state_checkpoint_test.go b/pkg/kubelet/cm/dra/state/state_checkpoint_test.go index 2d561b019cd..0ed650d98e5 100644 --- a/pkg/kubelet/cm/dra/state/state_checkpoint_test.go +++ b/pkg/kubelet/cm/dra/state/state_checkpoint_test.go @@ -49,22 +49,70 @@ func TestCheckpointGetOrCreate(t *testing.T) { []ClaimInfoState{}, }, { - "Restore valid checkpoint", - `{"version":"v1","entries":[{"DriverName":"test-driver.cdi.k8s.io","ClaimUID":"067798be-454e-4be4-9047-1aa06aea63f7","ClaimName":"example","Namespace":"default","PodUIDs":{"139cdb46-f989-4f17-9561-ca10cfb509a6":{}},"CdiDevices":["example.com/example=cdi-example"]}],"checksum":2939981547}`, + "Restore checkpoint - single claim", + `{"version":"v1","entries":[{"DriverName":"test-driver.cdi.k8s.io","ClaimUID":"067798be-454e-4be4-9047-1aa06aea63f7","ClaimName":"example","Namespace":"default","PodUIDs":{"139cdb46-f989-4f17-9561-ca10cfb509a6":{}},"CDIDevices":{"test-driver.cdi.k8s.io":["example.com/example=cdi-example"]}}],"checksum":1988120167}`, "", - []ClaimInfoState{{ - DriverName: "test-driver.cdi.k8s.io", - ClaimUID: "067798be-454e-4be4-9047-1aa06aea63f7", - ClaimName: "example", - Namespace: "default", - PodUIDs: sets.New("139cdb46-f989-4f17-9561-ca10cfb509a6"), - CdiDevices: []string{"example.com/example=cdi-example"}, - }, + []ClaimInfoState{ + { + DriverName: "test-driver.cdi.k8s.io", + ClaimUID: "067798be-454e-4be4-9047-1aa06aea63f7", + ClaimName: "example", + Namespace: "default", + PodUIDs: sets.New("139cdb46-f989-4f17-9561-ca10cfb509a6"), + CDIDevices: map[string][]string{ + "test-driver.cdi.k8s.io": {"example.com/example=cdi-example"}, + }, + }, }, }, { - "Restore checkpoint with invalid checksum", - `{"version":"v1","entries":[{"DriverName":"test-driver.cdi.k8s.io","ClaimUID":"067798be-454e-4be4-9047-1aa06aea63f7","ClaimName":"example","Namespace":"default","PodUIDs":{"139cdb46-f989-4f17-9561-ca10cfb509a6":{}},"CdiDevices":["example.com/example=cdi-example"]}],"checksum":2939981548}`, + "Restore checkpoint - single claim - multiple devices", + `{"version":"v1","entries":[{"DriverName":"meta-test-driver.cdi.k8s.io","ClaimUID":"067798be-454e-4be4-9047-1aa06aea63f7","ClaimName":"example","Namespace":"default","PodUIDs":{"139cdb46-f989-4f17-9561-ca10cfb509a6":{}},"CDIDevices":{"test-driver-1.cdi.k8s.io":["example-1.com/example-1=cdi-example-1"],"test-driver-2.cdi.k8s.io":["example-2.com/example-2=cdi-example-2"]}}],"checksum":2113538068}`, + "", + []ClaimInfoState{ + { + DriverName: "meta-test-driver.cdi.k8s.io", + ClaimUID: "067798be-454e-4be4-9047-1aa06aea63f7", + ClaimName: "example", + Namespace: "default", + PodUIDs: sets.New("139cdb46-f989-4f17-9561-ca10cfb509a6"), + CDIDevices: map[string][]string{ + "test-driver-1.cdi.k8s.io": {"example-1.com/example-1=cdi-example-1"}, + "test-driver-2.cdi.k8s.io": {"example-2.com/example-2=cdi-example-2"}, + }, + }, + }, + }, + { + "Restore checkpoint - multiple claims", + `{"version":"v1","entries":[{"DriverName":"test-driver.cdi.k8s.io","ClaimUID":"067798be-454e-4be4-9047-1aa06aea63f7","ClaimName":"example-1","Namespace":"default","PodUIDs":{"139cdb46-f989-4f17-9561-ca10cfb509a6":{}},"CDIDevices":{"test-driver.cdi.k8s.io":["example.com/example=cdi-example-1"]}},{"DriverName":"test-driver.cdi.k8s.io","ClaimUID":"4cf8db2d-06c0-7d70-1a51-e59b25b2c16c","ClaimName":"example-2","Namespace":"default","PodUIDs":{"139cdb46-f989-4f17-9561-ca10cfb509a6":{}},"CDIDevices":{"test-driver.cdi.k8s.io":["example.com/example=cdi-example-2"]}}],"checksum":666680545}`, + "", + []ClaimInfoState{ + { + DriverName: "test-driver.cdi.k8s.io", + ClaimUID: "067798be-454e-4be4-9047-1aa06aea63f7", + ClaimName: "example-1", + Namespace: "default", + PodUIDs: sets.New("139cdb46-f989-4f17-9561-ca10cfb509a6"), + CDIDevices: map[string][]string{ + "test-driver.cdi.k8s.io": {"example.com/example=cdi-example-1"}, + }, + }, + { + DriverName: "test-driver.cdi.k8s.io", + ClaimUID: "4cf8db2d-06c0-7d70-1a51-e59b25b2c16c", + ClaimName: "example-2", + Namespace: "default", + PodUIDs: sets.New("139cdb46-f989-4f17-9561-ca10cfb509a6"), + CDIDevices: map[string][]string{ + "test-driver.cdi.k8s.io": {"example.com/example=cdi-example-2"}, + }, + }, + }, + }, + { + "Restore checkpoint - invalid checksum", + `{"version":"v1","entries":[{"DriverName":"test-driver.cdi.k8s.io","ClaimUID":"067798be-454e-4be4-9047-1aa06aea63f7","ClaimName":"example","Namespace":"default","PodUIDs":{"139cdb46-f989-4f17-9561-ca10cfb509a6":{}},"CDIDevices":{"test-driver.cdi.k8s.io":["example.com/example=cdi-example"]}}],"checksum":1988120168}`, "checkpoint is corrupted", []ClaimInfoState{}, }, @@ -123,10 +171,12 @@ func TestCheckpointStateStore(t *testing.T) { ClaimName: "example", Namespace: "default", PodUIDs: sets.New("139cdb46-f989-4f17-9561-ca10cfb509a6"), - CdiDevices: []string{"example.com/example=cdi-example"}, + CDIDevices: map[string][]string{ + "test-driver.cdi.k8s.io": {"example.com/example=cdi-example"}, + }, } - expectedCheckpoint := `{"version":"v1","entries":[{"DriverName":"test-driver.cdi.k8s.io","ClaimUID":"067798be-454e-4be4-9047-1aa06aea63f7","ClaimName":"example","Namespace":"default","PodUIDs":{"139cdb46-f989-4f17-9561-ca10cfb509a6":{}},"CdiDevices":["example.com/example=cdi-example"]}],"checksum":2939981547}` + expectedCheckpoint := `{"version":"v1","entries":[{"DriverName":"test-driver.cdi.k8s.io","ClaimUID":"067798be-454e-4be4-9047-1aa06aea63f7","ClaimName":"example","Namespace":"default","PodUIDs":{"139cdb46-f989-4f17-9561-ca10cfb509a6":{}},"CDIDevices":{"test-driver.cdi.k8s.io":["example.com/example=cdi-example"]}}],"checksum":1988120167}` // create temp dir testingDir, err := os.MkdirTemp("", "dramanager_state_test")