From 685688c7031c5d17454e003de881698783cce1d6 Mon Sep 17 00:00:00 2001 From: Kevin Klues Date: Sat, 11 Mar 2023 14:17:37 +0000 Subject: [PATCH] Update DRAManager to allow multiple plugins to process a single claim Right now, the v1alpha1 API only passes enough information for one plugin to process a claim, but the v1alpha2 API will allow for multiple plugins to process a claim. This commit prepares the code for this upcoming change. Signed-off-by: Kevin Klues --- pkg/kubelet/cm/dra/claiminfo.go | 42 +++++--- pkg/kubelet/cm/dra/manager.go | 166 +++++++++++++++++++------------- 2 files changed, 127 insertions(+), 81 deletions(-) diff --git a/pkg/kubelet/cm/dra/claiminfo.go b/pkg/kubelet/cm/dra/claiminfo.go index c230e72d08f..89f1a68c884 100644 --- a/pkg/kubelet/cm/dra/claiminfo.go +++ b/pkg/kubelet/cm/dra/claiminfo.go @@ -57,27 +57,37 @@ type claimInfoCache struct { claimInfo map[string]*claimInfo } -func newClaimInfo(driverName string, claimUID types.UID, claimName, namespace string, podUIDs sets.Set[string], cdiDevice []string) (*claimInfo, error) { +func newClaimInfo(driverName string, claimUID types.UID, claimName, namespace string, podUIDs sets.Set[string]) *claimInfo { claimInfoState := state.ClaimInfoState{ DriverName: driverName, ClaimUID: claimUID, ClaimName: claimName, Namespace: namespace, PodUIDs: podUIDs, - CdiDevices: cdiDevice, - } - // NOTE: Passing CDI device names as annotations is a temporary solution - // It will be removed after all runtimes are updated - // to get CDI device names from the ContainerConfig.CDIDevices field - annotations, err := generateCDIAnnotations(claimUID, driverName, cdiDevice) - if err != nil { - return nil, fmt.Errorf("failed to generate container annotations, err: %+v", err) } claimInfo := claimInfo{ ClaimInfoState: claimInfoState, - annotations: annotations, } - return &claimInfo, nil + return &claimInfo +} + +func (info *claimInfo) addCDIDevices(pluginName string, cdiDevices []string) error { + // NOTE: Passing CDI device names as annotations is a temporary solution + // It will be removed after all runtimes are updated + // to get CDI device names from the ContainerConfig.CDIDevices field + annotations, err := generateCDIAnnotations(info.ClaimUID, info.DriverName, cdiDevices) + if err != nil { + return fmt.Errorf("failed to generate container annotations, err: %+v", err) + } + + if info.CDIDevices == nil { + info.CDIDevices = make(map[string][]string) + } + + info.CDIDevices[pluginName] = cdiDevices + info.annotations = append(info.annotations, annotations...) + + return nil } // newClaimInfoCache is a function that returns an instance of the claimInfoCache. @@ -98,16 +108,18 @@ func newClaimInfoCache(stateDir, checkpointName string) (*claimInfoCache, error) } for _, entry := range curState { - info, err := newClaimInfo( + info := newClaimInfo( entry.DriverName, entry.ClaimUID, entry.ClaimName, entry.Namespace, entry.PodUIDs, - entry.CdiDevices, ) - if err != nil { - return nil, fmt.Errorf("failed to create claimInfo %+v: %+v", info, err) + for pluginName, cdiDevices := range entry.CDIDevices { + err := info.addCDIDevices(pluginName, cdiDevices) + if err != nil { + return nil, fmt.Errorf("failed to add CDIDevices to claimInfo %+v: %+v", info, err) + } } cache.add(info) } diff --git a/pkg/kubelet/cm/dra/manager.go b/pkg/kubelet/cm/dra/manager.go index b3df135d803..69dfa6eac24 100644 --- a/pkg/kubelet/cm/dra/manager.go +++ b/pkg/kubelet/cm/dra/manager.go @@ -74,11 +74,14 @@ func (m *ManagerImpl) PrepareResources(pod *v1.Pod) error { // Resource is already prepared, add pod UID to it if claimInfo := m.cache.get(claimName, pod.Namespace); claimInfo != nil { - // We delay checkpointing of this change until this call returns successfully. - // It is OK to do this because we will only return successfully from this call if - // the checkpoint has succeeded. That means if the kubelet is ever restarted - // before this checkpoint succeeds, the pod whose resources are being prepared - // would never have started, so it's OK (actually correct) to not include it in the cache. + // We delay checkpointing of this change until this call + // returns successfully. It is OK to do this because we + // will only return successfully from this call if the + // checkpoint has succeeded. That means if the kubelet is + // ever restarted before this checkpoint succeeds, the pod + // whose resources are being prepared would never have + // started, so it's OK (actually correct) to not include it + // in the cache. claimInfo.addPodReference(pod.UID) continue } @@ -98,48 +101,76 @@ func (m *ManagerImpl) PrepareResources(pod *v1.Pod) error { pod.Name, pod.UID, podResourceClaim.Name, resourceClaim.UID) } - // Call NodePrepareResource RPC - driverName := resourceClaim.Status.DriverName - - client, err := dra.NewDRAPluginClient(driverName) - if err != nil { - return fmt.Errorf("failed to get DRA Plugin client for plugin name %s, err=%+v", driverName, err) + // Build a slice of "resourceHandles" to group the name of the + // kubelet plugin to call NodePrepareResources() on and the + // ResourceHandle data to be processed by that plugin. For now + // this slice will only have a single entry, where the name of + // the kubelet plugin matches the DriverName. In the future we + // plan to allow each claim to be processed by multiple plugins + // (each with their own ResourceHandle) so this code is being + // written in a way to accommodate this. + resourceHandles := []struct { + KubeletPluginName string + Data string + }{ + { + KubeletPluginName: resourceClaim.Status.DriverName, + Data: resourceClaim.Status.Allocation.ResourceHandle, + }, } - response, err := client.NodePrepareResource( - context.Background(), - resourceClaim.Namespace, - resourceClaim.UID, - resourceClaim.Name, - resourceClaim.Status.Allocation.ResourceHandle) - if err != nil { - return fmt.Errorf("NodePrepareResource failed, claim UID: %s, claim name: %s, resource handle: %s, err: %+v", - resourceClaim.UID, resourceClaim.Name, resourceClaim.Status.Allocation.ResourceHandle, err) - } - - klog.V(3).InfoS("NodePrepareResource succeeded", "response", response) - - // TODO: We are adding the claimInfo struct to the cache and syncing it to the checkpoint *after* the NodePrepareResource - // call has completed. This will cause issues if the kubelet gets restarted between NodePrepareResource and syncToCheckpoint. - // It will result in not calling NodeUnprepareResource for this claim because no claimInfo will be synced back to the cache - // for it after the restart. We need to resolve this issue before moving to beta. - claimInfo, err := newClaimInfo( - driverName, + // Create a claimInfo object to store the relevant claim info. + claimInfo := newClaimInfo( + resourceClaim.Status.DriverName, resourceClaim.UID, resourceClaim.Name, resourceClaim.Namespace, sets.New(string(pod.UID)), - response.CdiDevices) - if err != nil { - return fmt.Errorf("newClaimInfo failed, claim UID: %s, claim name: %s, claim namespace: %s, err: %+v", - resourceClaim.UID, resourceClaim.Name, resourceClaim.Namespace, err) - } + ) - m.cache.add(claimInfo) - // Checkpoint to reduce redundant calls to NodePrepareResource() after a kubelet restart. - err = m.cache.syncToCheckpoint() - if err != nil { - return fmt.Errorf("failed to checkpoint claimInfo state, err: %+v", err) + // Walk through each resourceHandle + for _, resourceHandle := range resourceHandles { + // Call NodePrepareResource RPC for each resourceHandle + client, err := dra.NewDRAPluginClient(resourceHandle.KubeletPluginName) + if err != nil { + return fmt.Errorf("failed to get DRA Plugin client for plugin name %s, err=%+v", resourceHandle.KubeletPluginName, err) + } + response, err := client.NodePrepareResource( + context.Background(), + resourceClaim.Namespace, + resourceClaim.UID, + resourceClaim.Name, + resourceHandle.Data) + if err != nil { + return fmt.Errorf("NodePrepareResource failed, claim UID: %s, claim name: %s, resource handle: %s, err: %+v", + resourceClaim.UID, resourceClaim.Name, resourceHandle.Data, err) + } + klog.V(3).InfoS("NodePrepareResource succeeded", "pluginName", resourceHandle.KubeletPluginName, "response", response) + + // Add the CDI Devices returned by NodePrepareResource to + // the claimInfo object. + err = claimInfo.addCDIDevices(resourceHandle.KubeletPluginName, response.CdiDevices) + if err != nil { + return fmt.Errorf("failed to add CDIDevices to claimInfo %+v: %+v", claimInfo, err) + } + + // TODO: We (re)add the claimInfo object to the cache and + // sync it to the checkpoint *after* the + // NodePrepareResource call has completed. This will cause + // issues if the kubelet gets restarted between + // NodePrepareResource and syncToCheckpoint. It will result + // in not calling NodeUnprepareResource for this claim + // because no claimInfo will be synced back to the cache + // for it after the restart. We need to resolve this issue + // before moving to beta. + m.cache.add(claimInfo) + + // Checkpoint to reduce redundant calls to + // NodePrepareResource() after a kubelet restart. + err = m.cache.syncToCheckpoint() + if err != nil { + return fmt.Errorf("failed to checkpoint claimInfo state, err: %+v", err) + } } } } @@ -173,8 +204,10 @@ func (m *ManagerImpl) GetResources(pod *v1.Pod, container *v1.Container) (*Conta klog.V(3).InfoS("Add resource annotations", "claim", claimName, "annotations", claimInfo.annotations) annotations = append(annotations, claimInfo.annotations...) - for _, cdiDevice := range claimInfo.CdiDevices { - cdiDevices = append(cdiDevices, kubecontainer.CDIDevice{Name: cdiDevice}) + for _, devices := range claimInfo.CDIDevices { + for _, device := range devices { + cdiDevices = append(cdiDevices, kubecontainer.CDIDevice{Name: device}) + } } } } @@ -208,35 +241,36 @@ func (m *ManagerImpl) UnprepareResources(pod *v1.Pod) error { continue } - // Call NodeUnprepareResource only for the last pod that references the claim - client, err := dra.NewDRAPluginClient(claimInfo.DriverName) - if err != nil { - return fmt.Errorf("failed to get DRA Plugin client for plugin name %s, err=%+v", claimInfo.DriverName, err) + // Loop through all plugins and call NodeUnprepareResource only for the + // last pod that references the claim + for pluginName, cdiDevices := range claimInfo.CDIDevices { + client, err := dra.NewDRAPluginClient(pluginName) + if err != nil { + return fmt.Errorf("failed to get DRA Plugin client for plugin name %s, err=%+v", claimInfo.DriverName, err) + } + response, err := client.NodeUnprepareResource( + context.Background(), + claimInfo.Namespace, + claimInfo.ClaimUID, + claimInfo.ClaimName, + cdiDevices) + if err != nil { + return fmt.Errorf( + "NodeUnprepareResource failed, pod: %s, claim UID: %s, claim name: %s, CDI devices: %s, err: %+v", + pod.Name, claimInfo.ClaimUID, claimInfo.ClaimName, claimInfo.CDIDevices, err) + } + klog.V(3).InfoS("NodeUnprepareResource succeeded", "response", response) } - response, err := client.NodeUnprepareResource( - context.Background(), - claimInfo.Namespace, - claimInfo.ClaimUID, - claimInfo.ClaimName, - claimInfo.CdiDevices) - if err != nil { - return fmt.Errorf( - "NodeUnprepareResource failed, pod: %s, claim UID: %s, claim name: %s, CDI devices: %s, err: %+v", - pod.Name, claimInfo.ClaimUID, claimInfo.ClaimName, claimInfo.CdiDevices, err) - } - - klog.V(3).InfoS("NodeUnprepareResource succeeded", "response", response) - - // Delete last pod UID only if NodeUnprepareResource call succeeds. - // This ensures that status manager doesn't enter termination status - // for the pod. This logic is implemented in the m.PodMightNeedToUnprepareResources - // and in the claimInfo.hasPodReference. + // Delete last pod UID only if all NodeUnprepareResource calls succeed. + // This ensures that the status manager doesn't enter termination status + // for the pod. This logic is implemented in + // m.PodMightNeedToUnprepareResources and claimInfo.hasPodReference. claimInfo.deletePodReference(pod.UID) - m.cache.delete(claimInfo.ClaimName, pod.Namespace) + // Checkpoint to reduce redundant calls to NodeUnPrepareResource() after a kubelet restart. - err = m.cache.syncToCheckpoint() + err := m.cache.syncToCheckpoint() if err != nil { return fmt.Errorf("failed to checkpoint claimInfo state, err: %+v", err) }