diff --git a/pkg/kubelet/cm/container_manager_linux.go b/pkg/kubelet/cm/container_manager_linux.go index 4ed1aa7b591..a85d3d5ff27 100644 --- a/pkg/kubelet/cm/container_manager_linux.go +++ b/pkg/kubelet/cm/container_manager_linux.go @@ -959,7 +959,6 @@ func (cm *containerManagerImpl) GetDynamicResources(pod *v1.Pod, container *v1.C } for _, containerClaimInfo := range containerClaimInfos { var claimResources []*podresourcesapi.ClaimResource - containerClaimInfo.RLock() // TODO: Currently we maintain a list of ClaimResources, each of which contains // a set of CDIDevices from a different kubelet plugin. In the future we may want to // include the name of the kubelet plugin and/or other types of resources that are @@ -971,7 +970,6 @@ func (cm *containerManagerImpl) GetDynamicResources(pod *v1.Pod, container *v1.C } claimResources = append(claimResources, &podresourcesapi.ClaimResource{CDIDevices: cdiDevices}) } - containerClaimInfo.RUnlock() containerDynamicResource := podresourcesapi.DynamicResource{ ClassName: containerClaimInfo.ClassName, ClaimName: containerClaimInfo.ClaimName, diff --git a/pkg/kubelet/cm/dra/claiminfo.go b/pkg/kubelet/cm/dra/claiminfo.go index d369b8d3e33..40a4cf40fbc 100644 --- a/pkg/kubelet/cm/dra/claiminfo.go +++ b/pkg/kubelet/cm/dra/claiminfo.go @@ -30,8 +30,8 @@ import ( // ClaimInfo holds information required // to prepare and unprepare a resource claim. +// +k8s:deepcopy-gen=true type ClaimInfo struct { - sync.RWMutex state.ClaimInfoState // annotations is a mapping of container annotations per DRA plugin associated with // a prepared resource @@ -39,24 +39,57 @@ type ClaimInfo struct { prepared bool } -func (info *ClaimInfo) addPodReference(podUID types.UID) { - info.Lock() - defer info.Unlock() - - info.PodUIDs.Insert(string(podUID)) +// claimInfoCache is a cache of processed resource claims keyed by namespace + claim name. +type claimInfoCache struct { + sync.RWMutex + state state.CheckpointState + claimInfo map[string]*ClaimInfo } -func (info *ClaimInfo) deletePodReference(podUID types.UID) { - info.Lock() - defer info.Unlock() - - info.PodUIDs.Delete(string(podUID)) +// newClaimInfoFromClaim creates a new claim info from a resource claim. +func newClaimInfoFromClaim(claim *resourcev1alpha2.ResourceClaim) *ClaimInfo { + // Grab the allocation.resourceHandles. If there are no + // allocation.resourceHandles, create a single resourceHandle with no + // content. This will trigger processing of this claim by a single + // kubelet plugin whose name matches resourceClaim.Status.DriverName. + resourceHandles := claim.Status.Allocation.ResourceHandles + if len(resourceHandles) == 0 { + resourceHandles = make([]resourcev1alpha2.ResourceHandle, 1) + } + claimInfoState := state.ClaimInfoState{ + DriverName: claim.Status.DriverName, + ClassName: claim.Spec.ResourceClassName, + ClaimUID: claim.UID, + ClaimName: claim.Name, + Namespace: claim.Namespace, + PodUIDs: sets.New[string](), + ResourceHandles: resourceHandles, + CDIDevices: make(map[string][]string), + } + info := &ClaimInfo{ + ClaimInfoState: claimInfoState, + annotations: make(map[string][]kubecontainer.Annotation), + prepared: false, + } + return info } -func (info *ClaimInfo) addCDIDevices(pluginName string, cdiDevices []string) error { - info.Lock() - defer info.Unlock() +// newClaimInfoFromClaim creates a new claim info from a checkpointed claim info state object. +func newClaimInfoFromState(state *state.ClaimInfoState) *ClaimInfo { + info := &ClaimInfo{ + ClaimInfoState: *state.DeepCopy(), + annotations: make(map[string][]kubecontainer.Annotation), + prepared: false, + } + for pluginName, devices := range info.CDIDevices { + annotations, _ := cdi.GenerateAnnotations(info.ClaimUID, info.DriverName, devices) + info.annotations[pluginName] = append(info.annotations[pluginName], annotations...) + } + return info +} +// setCDIDevices adds a set of CDI devices to the claim info. +func (info *ClaimInfo) setCDIDevices(pluginName string, cdiDevices []string) error { // NOTE: Passing CDI device names as annotations is a temporary solution // It will be removed after all runtimes are updated // to get CDI device names from the ContainerConfig.CDIDevices field @@ -77,9 +110,6 @@ func (info *ClaimInfo) addCDIDevices(pluginName string, cdiDevices []string) err // annotationsAsList returns container annotations as a single list. func (info *ClaimInfo) annotationsAsList() []kubecontainer.Annotation { - info.RLock() - defer info.RUnlock() - var lst []kubecontainer.Annotation for _, v := range info.annotations { lst = append(lst, v...) @@ -87,53 +117,43 @@ func (info *ClaimInfo) annotationsAsList() []kubecontainer.Annotation { return lst } -// claimInfoCache is a cache of processed resource claims keyed by namespace + claim name. -type claimInfoCache struct { - sync.RWMutex - state state.CheckpointState - claimInfo map[string]*ClaimInfo +// cdiDevicesAsList returns a list of CDIDevices from the provided claim info. +func (info *ClaimInfo) cdiDevicesAsList() []kubecontainer.CDIDevice { + var cdiDevices []kubecontainer.CDIDevice + for _, devices := range info.CDIDevices { + for _, device := range devices { + cdiDevices = append(cdiDevices, kubecontainer.CDIDevice{Name: device}) + } + } + return cdiDevices } -func newClaimInfo(driverName, className string, claimUID types.UID, claimName, namespace string, podUIDs sets.Set[string], resourceHandles []resourcev1alpha2.ResourceHandle) *ClaimInfo { - claimInfoState := state.ClaimInfoState{ - DriverName: driverName, - ClassName: className, - ClaimUID: claimUID, - ClaimName: claimName, - Namespace: namespace, - PodUIDs: podUIDs, - ResourceHandles: resourceHandles, - } - claimInfo := ClaimInfo{ - ClaimInfoState: claimInfoState, - annotations: make(map[string][]kubecontainer.Annotation), - } - return &claimInfo +// addPodReference adds a pod reference to the claim info. +func (info *ClaimInfo) addPodReference(podUID types.UID) { + info.PodUIDs.Insert(string(podUID)) } -// newClaimInfoFromResourceClaim creates a new ClaimInfo object -func newClaimInfoFromResourceClaim(resourceClaim *resourcev1alpha2.ResourceClaim) *ClaimInfo { - // Grab the allocation.resourceHandles. If there are no - // allocation.resourceHandles, create a single resourceHandle with no - // content. This will trigger processing of this claim by a single - // kubelet plugin whose name matches resourceClaim.Status.DriverName. - resourceHandles := resourceClaim.Status.Allocation.ResourceHandles - if len(resourceHandles) == 0 { - resourceHandles = make([]resourcev1alpha2.ResourceHandle, 1) - } - - return newClaimInfo( - resourceClaim.Status.DriverName, - resourceClaim.Spec.ResourceClassName, - resourceClaim.UID, - resourceClaim.Name, - resourceClaim.Namespace, - make(sets.Set[string]), - resourceHandles, - ) +// hasPodReference checks if a pod reference exists in the claim info. +func (info *ClaimInfo) hasPodReference(podUID types.UID) bool { + return info.PodUIDs.Has(string(podUID)) } -// newClaimInfoCache is a function that returns an instance of the claimInfoCache. +// deletePodReference deletes a pod reference from the claim info. +func (info *ClaimInfo) deletePodReference(podUID types.UID) { + info.PodUIDs.Delete(string(podUID)) +} + +// setPrepared marks the claim info as prepared. +func (info *ClaimInfo) setPrepared() { + info.prepared = true +} + +// isPrepared checks if claim info is prepared or not. +func (info *ClaimInfo) isPrepared() bool { + return info.prepared +} + +// newClaimInfoCache creates a new claim info cache object, pre-populated from a checkpoint (if present). func newClaimInfoCache(stateDir, checkpointName string) (*claimInfoCache, error) { stateImpl, err := state.NewCheckpointState(stateDir, checkpointName) if err != nil { @@ -151,45 +171,47 @@ func newClaimInfoCache(stateDir, checkpointName string) (*claimInfoCache, error) } for _, entry := range curState { - info := newClaimInfo( - entry.DriverName, - entry.ClassName, - entry.ClaimUID, - entry.ClaimName, - entry.Namespace, - entry.PodUIDs, - entry.ResourceHandles, - ) - for pluginName, cdiDevices := range entry.CDIDevices { - err := info.addCDIDevices(pluginName, cdiDevices) - if err != nil { - return nil, fmt.Errorf("failed to add CDIDevices to claimInfo %+v: %+v", info, err) - } - } - cache.add(info) + info := newClaimInfoFromState(&entry) + cache.claimInfo[info.ClaimName+info.Namespace] = info } return cache, nil } -func (cache *claimInfoCache) add(res *ClaimInfo) { +// withLock runs a function while holding the claimInfoCache lock. +func (cache *claimInfoCache) withLock(f func() error) error { cache.Lock() defer cache.Unlock() - - cache.claimInfo[res.ClaimName+res.Namespace] = res + return f() } -func (cache *claimInfoCache) get(claimName, namespace string) *ClaimInfo { +// withRLock runs a function while holding the claimInfoCache rlock. +func (cache *claimInfoCache) withRLock(f func() error) error { cache.RLock() defer cache.RUnlock() - - return cache.claimInfo[claimName+namespace] + return f() } -func (cache *claimInfoCache) delete(claimName, namespace string) { - cache.Lock() - defer cache.Unlock() +// add adds a new claim info object into the claim info cache. +func (cache *claimInfoCache) add(info *ClaimInfo) *ClaimInfo { + cache.claimInfo[info.ClaimName+info.Namespace] = info + return info +} +// contains checks to see if a specific claim info object is already in the cache. +func (cache *claimInfoCache) contains(claimName, namespace string) bool { + _, exists := cache.claimInfo[claimName+namespace] + return exists +} + +// get gets a specific claim info object from the cache. +func (cache *claimInfoCache) get(claimName, namespace string) (*ClaimInfo, bool) { + info, exists := cache.claimInfo[claimName+namespace] + return info, exists +} + +// delete deletes a specific claim info object from the cache. +func (cache *claimInfoCache) delete(claimName, namespace string) { delete(cache.claimInfo, claimName+namespace) } @@ -198,26 +220,19 @@ func (cache *claimInfoCache) delete(claimName, namespace string) { // This function is used indirectly by the status manager // to check if pod can enter termination status func (cache *claimInfoCache) hasPodReference(UID types.UID) bool { - cache.RLock() - defer cache.RUnlock() - for _, claimInfo := range cache.claimInfo { - if claimInfo.PodUIDs.Has(string(UID)) { + if claimInfo.hasPodReference(UID) { return true } } - return false } +// syncToCheckpoint syncs the full claim info cache state to a checkpoint. func (cache *claimInfoCache) syncToCheckpoint() error { - cache.RLock() - defer cache.RUnlock() - claimInfoStateList := make(state.ClaimInfoStateList, 0, len(cache.claimInfo)) for _, infoClaim := range cache.claimInfo { claimInfoStateList = append(claimInfoStateList, infoClaim.ClaimInfoState) } - return cache.state.Store(claimInfoStateList) } diff --git a/pkg/kubelet/cm/dra/manager.go b/pkg/kubelet/cm/dra/manager.go index ad9b17dfbd8..f55037ef7d3 100644 --- a/pkg/kubelet/cm/dra/manager.go +++ b/pkg/kubelet/cm/dra/manager.go @@ -67,7 +67,7 @@ func NewManagerImpl(kubeClient clientset.Interface, stateFileDirectory string, n // containerResources on success. func (m *ManagerImpl) PrepareResources(pod *v1.Pod) error { batches := make(map[string][]*drapb.Claim) - claimInfos := make(map[types.UID]*ClaimInfo) + resourceClaims := make(map[types.UID]*resourceapi.ResourceClaim) for i := range pod.Spec.ResourceClaims { podClaim := &pod.Spec.ResourceClaims[i] klog.V(3).InfoS("Processing resource", "podClaim", podClaim.Name, "pod", pod.Name) @@ -108,48 +108,60 @@ func (m *ManagerImpl) PrepareResources(pod *v1.Pod) error { continue } - claimInfo := m.cache.get(*claimName, pod.Namespace) - if claimInfo == nil { - // claim does not exist in cache, create new claimInfo object - // to be processed later. - claimInfo = newClaimInfoFromResourceClaim(resourceClaim) - } - - // We delay checkpointing of this change until this call - // returns successfully. It is OK to do this because we - // will only return successfully from this call if the - // checkpoint has succeeded. That means if the kubelet is - // ever restarted before this checkpoint succeeds, the pod - // whose resources are being prepared would never have - // started, so it's OK (actually correct) to not include it - // in the cache. - claimInfo.addPodReference(pod.UID) - - if claimInfo.prepared { - // Already prepared this claim, no need to prepare it again - continue - } - - // Loop through all plugins and prepare for calling NodePrepareResources. - for _, resourceHandle := range claimInfo.ResourceHandles { - // If no DriverName is provided in the resourceHandle, we - // use the DriverName from the status - pluginName := resourceHandle.DriverName - if pluginName == "" { - pluginName = resourceClaim.Status.DriverName + // Atomically perform some operations on the claimInfo cache. + err = m.cache.withLock(func() error { + // Get a reference to the claim info for this claim from the cache. + // If there isn't one yet, then add it to the cache. + claimInfo, exists := m.cache.get(resourceClaim.Name, resourceClaim.Namespace) + if !exists { + claimInfo = m.cache.add(newClaimInfoFromClaim(resourceClaim)) } - claim := &drapb.Claim{ - Namespace: resourceClaim.Namespace, - Uid: string(resourceClaim.UID), - Name: resourceClaim.Name, - ResourceHandle: resourceHandle.Data, + + // Add a reference to the current pod in the claim info. + claimInfo.addPodReference(pod.UID) + + // Checkpoint to ensure all claims we plan to prepare are tracked. + // If something goes wrong and the newly referenced pod gets + // deleted without a successful prepare call, we will catch + // that in the reconcile loop and take the appropriate action. + if err := m.cache.syncToCheckpoint(); err != nil { + return fmt.Errorf("failed to checkpoint claimInfo state: %w", err) } - if resourceHandle.StructuredData != nil { - claim.StructuredResourceHandle = []*resourceapi.StructuredResourceHandle{resourceHandle.StructuredData} + + // If this claim is already prepared, there is no need to prepare it again. + if claimInfo.isPrepared() { + return nil } - batches[pluginName] = append(batches[pluginName], claim) + + // This saved claim will be used to update ClaimInfo cache + // after NodePrepareResources GRPC succeeds + resourceClaims[claimInfo.ClaimUID] = resourceClaim + + // Loop through all plugins and prepare for calling NodePrepareResources. + for _, resourceHandle := range claimInfo.ResourceHandles { + // If no DriverName is provided in the resourceHandle, we + // use the DriverName from the status + pluginName := claimInfo.DriverName + if pluginName == "" { + pluginName = claimInfo.DriverName + } + claim := &drapb.Claim{ + Namespace: claimInfo.Namespace, + Uid: string(claimInfo.ClaimUID), + Name: claimInfo.ClaimName, + ResourceHandle: resourceHandle.Data, + } + if resourceHandle.StructuredData != nil { + claim.StructuredResourceHandle = []*resourceapi.StructuredResourceHandle{resourceHandle.StructuredData} + } + batches[pluginName] = append(batches[pluginName], claim) + } + + return nil + }) + if err != nil { + return fmt.Errorf("locked cache operation: %w", err) } - claimInfos[resourceClaim.UID] = claimInfo } // Call NodePrepareResources for all claims in each batch. @@ -175,34 +187,22 @@ func (m *ManagerImpl) PrepareResources(pod *v1.Pod) error { return fmt.Errorf("NodePrepareResources failed for claim %s/%s: %s", reqClaim.Namespace, reqClaim.Name, result.Error) } - claimInfo := claimInfos[types.UID(claimUID)] + claim := resourceClaims[types.UID(claimUID)] - // Add the CDI Devices returned by NodePrepareResources to - // the claimInfo object. - err = claimInfo.addCDIDevices(pluginName, result.GetCDIDevices()) + // Add the prepared CDI devices to the claim info + err := m.cache.withLock(func() error { + info, exists := m.cache.get(claim.Name, claim.Namespace) + if !exists { + return fmt.Errorf("unable to get claim info for claim %s in namespace %s", claim.Name, claim.Namespace) + } + if err := info.setCDIDevices(pluginName, result.GetCDIDevices()); err != nil { + return fmt.Errorf("unable to add CDI devices for plugin %s of claim %s in namespace %s", pluginName, claim.Name, claim.Namespace) + } + return nil + }) if err != nil { - return fmt.Errorf("failed to add CDIDevices to claimInfo %+v: %+v", claimInfo, err) + return fmt.Errorf("locked cache operation: %w", err) } - // mark claim as (successfully) prepared by manager, so next time we don't prepare it. - claimInfo.prepared = true - - // TODO: We (re)add the claimInfo object to the cache and - // sync it to the checkpoint *after* the - // NodePrepareResources call has completed. This will cause - // issues if the kubelet gets restarted between - // NodePrepareResources and syncToCheckpoint. It will result - // in not calling NodeUnprepareResources for this claim - // because no claimInfo will be synced back to the cache - // for it after the restart. We need to resolve this issue - // before moving to beta. - m.cache.add(claimInfo) - } - - // Checkpoint to reduce redundant calls to - // NodePrepareResources after a kubelet restart. - err = m.cache.syncToCheckpoint() - if err != nil { - return fmt.Errorf("failed to checkpoint claimInfo state, err: %+v", err) } unfinished := len(claims) - len(response.Claims) @@ -210,11 +210,30 @@ func (m *ManagerImpl) PrepareResources(pod *v1.Pod) error { return fmt.Errorf("NodePrepareResources left out %d claims", unfinished) } } - // Checkpoint to capture all of the previous addPodReference() calls. - err := m.cache.syncToCheckpoint() + + // Atomically perform some operations on the claimInfo cache. + err := m.cache.withLock(func() error { + // Mark all pod claims as prepared. + for _, claim := range resourceClaims { + info, exists := m.cache.get(claim.Name, claim.Namespace) + if !exists { + return fmt.Errorf("unable to get claim info for claim %s in namespace %s", claim.Name, claim.Namespace) + } + info.setPrepared() + } + + // Checkpoint to ensure all prepared claims are tracked with their list + // of CDI devices attached. + if err := m.cache.syncToCheckpoint(); err != nil { + return fmt.Errorf("failed to checkpoint claimInfo state: %w", err) + } + + return nil + }) if err != nil { - return fmt.Errorf("failed to checkpoint claimInfo state, err: %+v", err) + return fmt.Errorf("locked cache operation: %w", err) } + return nil } @@ -277,21 +296,25 @@ func (m *ManagerImpl) GetResources(pod *v1.Pod, container *v1.Container) (*Conta continue } - claimInfo := m.cache.get(*claimName, pod.Namespace) - if claimInfo == nil { - return nil, fmt.Errorf("unable to get resource for namespace: %s, claim: %s", pod.Namespace, *claimName) - } - - claimInfo.RLock() - claimAnnotations := claimInfo.annotationsAsList() - klog.V(3).InfoS("Add resource annotations", "claim", *claimName, "annotations", claimAnnotations) - annotations = append(annotations, claimAnnotations...) - for _, devices := range claimInfo.CDIDevices { - for _, device := range devices { - cdiDevices = append(cdiDevices, kubecontainer.CDIDevice{Name: device}) + err := m.cache.withRLock(func() error { + claimInfo, exists := m.cache.get(*claimName, pod.Namespace) + if !exists { + return fmt.Errorf("unable to get claim info for claim %s in namespace %s", *claimName, pod.Namespace) } + + claimAnnotations := claimInfo.annotationsAsList() + klog.V(3).InfoS("Add resource annotations", "claim", *claimName, "annotations", claimAnnotations) + annotations = append(annotations, claimAnnotations...) + + devices := claimInfo.cdiDevicesAsList() + klog.V(3).InfoS("Add CDI devices", "claim", *claimName, "CDI devices", devices) + cdiDevices = append(cdiDevices, devices...) + + return nil + }) + if err != nil { + return nil, fmt.Errorf("locked cache operation: %w", err) } - claimInfo.RUnlock() } } @@ -303,60 +326,79 @@ func (m *ManagerImpl) GetResources(pod *v1.Pod, container *v1.Container) (*Conta // As such, calls to the underlying NodeUnprepareResource API are skipped for claims that have // already been successfully unprepared. func (m *ManagerImpl) UnprepareResources(pod *v1.Pod) error { - batches := make(map[string][]*drapb.Claim) - claimInfos := make(map[types.UID]*ClaimInfo) + var claimNames []string for i := range pod.Spec.ResourceClaims { claimName, _, err := resourceclaim.Name(pod, &pod.Spec.ResourceClaims[i]) if err != nil { return fmt.Errorf("unprepare resource claim: %v", err) } - // The claim name might be nil if no underlying resource claim // was generated for the referenced claim. There are valid use // cases when this might happen, so we simply skip it. if claimName == nil { continue } + claimNames = append(claimNames, *claimName) + } + return m.unprepareResources(pod.UID, pod.Namespace, claimNames) +} - claimInfo := m.cache.get(*claimName, pod.Namespace) +func (m *ManagerImpl) unprepareResources(podUID types.UID, namespace string, claimNames []string) error { + batches := make(map[string][]*drapb.Claim) + claimNamesMap := make(map[types.UID]string) + for _, claimName := range claimNames { + // Atomically perform some operations on the claimInfo cache. + err := m.cache.withLock(func() error { + // Get the claim info from the cache + claimInfo, exists := m.cache.get(claimName, namespace) - // Skip calling NodeUnprepareResource if claim info is not cached - if claimInfo == nil { - continue - } - - // Skip calling NodeUnprepareResource if other pods are still referencing it - if len(claimInfo.PodUIDs) > 1 { - // We delay checkpointing of this change until this call returns successfully. - // It is OK to do this because we will only return successfully from this call if - // the checkpoint has succeeded. That means if the kubelet is ever restarted - // before this checkpoint succeeds, we will simply call into this (idempotent) - // function again. - claimInfo.deletePodReference(pod.UID) - continue - } - - // Loop through all plugins and prepare for calling NodeUnprepareResources. - for _, resourceHandle := range claimInfo.ResourceHandles { - // If no DriverName is provided in the resourceHandle, we - // use the DriverName from the status - pluginName := resourceHandle.DriverName - if pluginName == "" { - pluginName = claimInfo.DriverName + // Skip calling NodeUnprepareResource if claim info is not cached + if !exists { + return nil } - claim := &drapb.Claim{ - Namespace: claimInfo.Namespace, - Uid: string(claimInfo.ClaimUID), - Name: claimInfo.ClaimName, - ResourceHandle: resourceHandle.Data, + // Skip calling NodeUnprepareResource if other pods are still referencing it + if len(claimInfo.PodUIDs) > 1 { + // We delay checkpointing of this change until + // UnprepareResources returns successfully. It is OK to do + // this because we will only return successfully from this call + // if the checkpoint has succeeded. That means if the kubelet + // is ever restarted before this checkpoint succeeds, we will + // simply call into this (idempotent) function again. + claimInfo.deletePodReference(podUID) + return nil } - if resourceHandle.StructuredData != nil { - claim.StructuredResourceHandle = []*resourceapi.StructuredResourceHandle{resourceHandle.StructuredData} + + // This claimInfo name will be used to update ClaimInfo cache + // after NodeUnprepareResources GRPC succeeds + claimNamesMap[claimInfo.ClaimUID] = claimInfo.ClaimName + + // Loop through all plugins and prepare for calling NodeUnprepareResources. + for _, resourceHandle := range claimInfo.ResourceHandles { + // If no DriverName is provided in the resourceHandle, we + // use the DriverName from the status + pluginName := resourceHandle.DriverName + if pluginName == "" { + pluginName = claimInfo.DriverName + } + + claim := &drapb.Claim{ + Namespace: claimInfo.Namespace, + Uid: string(claimInfo.ClaimUID), + Name: claimInfo.ClaimName, + ResourceHandle: resourceHandle.Data, + } + if resourceHandle.StructuredData != nil { + claim.StructuredResourceHandle = []*resourceapi.StructuredResourceHandle{resourceHandle.StructuredData} + } + batches[pluginName] = append(batches[pluginName], claim) } - batches[pluginName] = append(batches[pluginName], claim) + + return nil + }) + if err != nil { + return fmt.Errorf("locked cache operation: %w", err) } - claimInfos[claimInfo.ClaimUID] = claimInfo } // Call NodeUnprepareResources for all claims in each batch. @@ -382,20 +424,6 @@ func (m *ManagerImpl) UnprepareResources(pod *v1.Pod) error { if result.GetError() != "" { return fmt.Errorf("NodeUnprepareResources failed for claim %s/%s: %s", reqClaim.Namespace, reqClaim.Name, result.Error) } - - // Delete last pod UID only if unprepare succeeds. - // This ensures that the status manager doesn't enter termination status - // for the pod. This logic is implemented in - // m.PodMightNeedToUnprepareResources and claimInfo.hasPodReference. - claimInfo := claimInfos[types.UID(claimUID)] - claimInfo.deletePodReference(pod.UID) - m.cache.delete(claimInfo.ClaimName, pod.Namespace) - } - - // Checkpoint to reduce redundant calls to NodeUnprepareResources after a kubelet restart. - err = m.cache.syncToCheckpoint() - if err != nil { - return fmt.Errorf("failed to checkpoint claimInfo state, err: %+v", err) } unfinished := len(claims) - len(response.Claims) @@ -404,21 +432,35 @@ func (m *ManagerImpl) UnprepareResources(pod *v1.Pod) error { } } - // Checkpoint to capture all of the previous deletePodReference() calls. - err := m.cache.syncToCheckpoint() + // Atomically perform some operations on the claimInfo cache. + err := m.cache.withLock(func() error { + // Delete all claimInfos from the cache that have just been unprepared. + for _, claimName := range claimNamesMap { + m.cache.delete(claimName, namespace) + } + + // Atomically sync the cache back to the checkpoint. + if err := m.cache.syncToCheckpoint(); err != nil { + return fmt.Errorf("failed to checkpoint claimInfo state: %w", err) + } + return nil + }) if err != nil { - return fmt.Errorf("failed to checkpoint claimInfo state, err: %+v", err) + return fmt.Errorf("locked cache operation: %w", err) } + return nil } // PodMightNeedToUnprepareResources returns true if the pod might need to // unprepare resources func (m *ManagerImpl) PodMightNeedToUnprepareResources(UID types.UID) bool { + m.cache.Lock() + defer m.cache.Unlock() return m.cache.hasPodReference(UID) } -// GetCongtainerClaimInfos gets Container's ClaimInfo +// GetContainerClaimInfos gets Container's ClaimInfo func (m *ManagerImpl) GetContainerClaimInfos(pod *v1.Pod, container *v1.Container) ([]*ClaimInfo, error) { claimInfos := make([]*ClaimInfo, 0, len(pod.Spec.ResourceClaims)) @@ -432,11 +474,18 @@ func (m *ManagerImpl) GetContainerClaimInfos(pod *v1.Pod, container *v1.Containe if podResourceClaim.Name != claim.Name { continue } - claimInfo := m.cache.get(*claimName, pod.Namespace) - if claimInfo == nil { - return nil, fmt.Errorf("unable to get resource for namespace: %s, claim: %s", pod.Namespace, *claimName) + + err := m.cache.withRLock(func() error { + claimInfo, exists := m.cache.get(*claimName, pod.Namespace) + if !exists { + return fmt.Errorf("unable to get claim info for claim %s in namespace %s", *claimName, pod.Namespace) + } + claimInfos = append(claimInfos, claimInfo.DeepCopy()) + return nil + }) + if err != nil { + return nil, fmt.Errorf("locked cache operation: %w", err) } - claimInfos = append(claimInfos, claimInfo) } } return claimInfos, nil diff --git a/pkg/kubelet/cm/dra/state/state_checkpoint.go b/pkg/kubelet/cm/dra/state/state_checkpoint.go index a391f0a13ca..a82f6b11bb4 100644 --- a/pkg/kubelet/cm/dra/state/state_checkpoint.go +++ b/pkg/kubelet/cm/dra/state/state_checkpoint.go @@ -36,6 +36,7 @@ type CheckpointState interface { } // ClaimInfoState is used to store claim info state in a checkpoint +// +k8s:deepcopy-gen=true type ClaimInfoState struct { // Name of the DRA driver DriverName string diff --git a/pkg/kubelet/cm/dra/state/zz_generated.deepcopy.go b/pkg/kubelet/cm/dra/state/zz_generated.deepcopy.go new file mode 100644 index 00000000000..d27ecf60883 --- /dev/null +++ b/pkg/kubelet/cm/dra/state/zz_generated.deepcopy.go @@ -0,0 +1,72 @@ +//go:build !ignore_autogenerated +// +build !ignore_autogenerated + +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by deepcopy-gen. DO NOT EDIT. + +package state + +import ( + v1alpha2 "k8s.io/api/resource/v1alpha2" + sets "k8s.io/apimachinery/pkg/util/sets" +) + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ClaimInfoState) DeepCopyInto(out *ClaimInfoState) { + *out = *in + if in.PodUIDs != nil { + in, out := &in.PodUIDs, &out.PodUIDs + *out = make(sets.Set[string], len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } + if in.ResourceHandles != nil { + in, out := &in.ResourceHandles, &out.ResourceHandles + *out = make([]v1alpha2.ResourceHandle, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } + if in.CDIDevices != nil { + in, out := &in.CDIDevices, &out.CDIDevices + *out = make(map[string][]string, len(*in)) + for key, val := range *in { + var outVal []string + if val == nil { + (*out)[key] = nil + } else { + in, out := &val, &outVal + *out = make([]string, len(*in)) + copy(*out, *in) + } + (*out)[key] = outVal + } + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ClaimInfoState. +func (in *ClaimInfoState) DeepCopy() *ClaimInfoState { + if in == nil { + return nil + } + out := new(ClaimInfoState) + in.DeepCopyInto(out) + return out +} diff --git a/pkg/kubelet/cm/dra/zz_generated.deepcopy.go b/pkg/kubelet/cm/dra/zz_generated.deepcopy.go new file mode 100644 index 00000000000..cc10fdaf53e --- /dev/null +++ b/pkg/kubelet/cm/dra/zz_generated.deepcopy.go @@ -0,0 +1,58 @@ +//go:build !ignore_autogenerated +// +build !ignore_autogenerated + +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by deepcopy-gen. DO NOT EDIT. + +package dra + +import ( + container "k8s.io/kubernetes/pkg/kubelet/container" +) + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ClaimInfo) DeepCopyInto(out *ClaimInfo) { + *out = *in + in.ClaimInfoState.DeepCopyInto(&out.ClaimInfoState) + if in.annotations != nil { + in, out := &in.annotations, &out.annotations + *out = make(map[string][]container.Annotation, len(*in)) + for key, val := range *in { + var outVal []container.Annotation + if val == nil { + (*out)[key] = nil + } else { + in, out := &val, &outVal + *out = make([]container.Annotation, len(*in)) + copy(*out, *in) + } + (*out)[key] = outVal + } + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ClaimInfo. +func (in *ClaimInfo) DeepCopy() *ClaimInfo { + if in == nil { + return nil + } + out := new(ClaimInfo) + in.DeepCopyInto(out) + return out +}