kubelet: DRA: update locking/checkpoint semantics of the claimInfo cache

Signed-off-by: Kevin Klues <kklues@nvidia.com>
This commit is contained in:
Kevin Klues 2024-04-30 11:09:19 +00:00
parent bac1a0ce1b
commit a8931c6c25
6 changed files with 425 additions and 232 deletions

View File

@ -959,7 +959,6 @@ func (cm *containerManagerImpl) GetDynamicResources(pod *v1.Pod, container *v1.C
} }
for _, containerClaimInfo := range containerClaimInfos { for _, containerClaimInfo := range containerClaimInfos {
var claimResources []*podresourcesapi.ClaimResource var claimResources []*podresourcesapi.ClaimResource
containerClaimInfo.RLock()
// TODO: Currently we maintain a list of ClaimResources, each of which contains // TODO: Currently we maintain a list of ClaimResources, each of which contains
// a set of CDIDevices from a different kubelet plugin. In the future we may want to // a set of CDIDevices from a different kubelet plugin. In the future we may want to
// include the name of the kubelet plugin and/or other types of resources that are // include the name of the kubelet plugin and/or other types of resources that are
@ -971,7 +970,6 @@ func (cm *containerManagerImpl) GetDynamicResources(pod *v1.Pod, container *v1.C
} }
claimResources = append(claimResources, &podresourcesapi.ClaimResource{CDIDevices: cdiDevices}) claimResources = append(claimResources, &podresourcesapi.ClaimResource{CDIDevices: cdiDevices})
} }
containerClaimInfo.RUnlock()
containerDynamicResource := podresourcesapi.DynamicResource{ containerDynamicResource := podresourcesapi.DynamicResource{
ClassName: containerClaimInfo.ClassName, ClassName: containerClaimInfo.ClassName,
ClaimName: containerClaimInfo.ClaimName, ClaimName: containerClaimInfo.ClaimName,

View File

@ -30,8 +30,8 @@ import (
// ClaimInfo holds information required // ClaimInfo holds information required
// to prepare and unprepare a resource claim. // to prepare and unprepare a resource claim.
// +k8s:deepcopy-gen=true
type ClaimInfo struct { type ClaimInfo struct {
sync.RWMutex
state.ClaimInfoState state.ClaimInfoState
// annotations is a mapping of container annotations per DRA plugin associated with // annotations is a mapping of container annotations per DRA plugin associated with
// a prepared resource // a prepared resource
@ -39,24 +39,57 @@ type ClaimInfo struct {
prepared bool prepared bool
} }
func (info *ClaimInfo) addPodReference(podUID types.UID) { // claimInfoCache is a cache of processed resource claims keyed by namespace + claim name.
info.Lock() type claimInfoCache struct {
defer info.Unlock() sync.RWMutex
state state.CheckpointState
info.PodUIDs.Insert(string(podUID)) claimInfo map[string]*ClaimInfo
} }
func (info *ClaimInfo) deletePodReference(podUID types.UID) { // newClaimInfoFromClaim creates a new claim info from a resource claim.
info.Lock() func newClaimInfoFromClaim(claim *resourcev1alpha2.ResourceClaim) *ClaimInfo {
defer info.Unlock() // Grab the allocation.resourceHandles. If there are no
// allocation.resourceHandles, create a single resourceHandle with no
info.PodUIDs.Delete(string(podUID)) // content. This will trigger processing of this claim by a single
// kubelet plugin whose name matches resourceClaim.Status.DriverName.
resourceHandles := claim.Status.Allocation.ResourceHandles
if len(resourceHandles) == 0 {
resourceHandles = make([]resourcev1alpha2.ResourceHandle, 1)
}
claimInfoState := state.ClaimInfoState{
DriverName: claim.Status.DriverName,
ClassName: claim.Spec.ResourceClassName,
ClaimUID: claim.UID,
ClaimName: claim.Name,
Namespace: claim.Namespace,
PodUIDs: sets.New[string](),
ResourceHandles: resourceHandles,
CDIDevices: make(map[string][]string),
}
info := &ClaimInfo{
ClaimInfoState: claimInfoState,
annotations: make(map[string][]kubecontainer.Annotation),
prepared: false,
}
return info
} }
func (info *ClaimInfo) addCDIDevices(pluginName string, cdiDevices []string) error { // newClaimInfoFromClaim creates a new claim info from a checkpointed claim info state object.
info.Lock() func newClaimInfoFromState(state *state.ClaimInfoState) *ClaimInfo {
defer info.Unlock() info := &ClaimInfo{
ClaimInfoState: *state.DeepCopy(),
annotations: make(map[string][]kubecontainer.Annotation),
prepared: false,
}
for pluginName, devices := range info.CDIDevices {
annotations, _ := cdi.GenerateAnnotations(info.ClaimUID, info.DriverName, devices)
info.annotations[pluginName] = append(info.annotations[pluginName], annotations...)
}
return info
}
// setCDIDevices adds a set of CDI devices to the claim info.
func (info *ClaimInfo) setCDIDevices(pluginName string, cdiDevices []string) error {
// NOTE: Passing CDI device names as annotations is a temporary solution // NOTE: Passing CDI device names as annotations is a temporary solution
// It will be removed after all runtimes are updated // It will be removed after all runtimes are updated
// to get CDI device names from the ContainerConfig.CDIDevices field // to get CDI device names from the ContainerConfig.CDIDevices field
@ -77,9 +110,6 @@ func (info *ClaimInfo) addCDIDevices(pluginName string, cdiDevices []string) err
// annotationsAsList returns container annotations as a single list. // annotationsAsList returns container annotations as a single list.
func (info *ClaimInfo) annotationsAsList() []kubecontainer.Annotation { func (info *ClaimInfo) annotationsAsList() []kubecontainer.Annotation {
info.RLock()
defer info.RUnlock()
var lst []kubecontainer.Annotation var lst []kubecontainer.Annotation
for _, v := range info.annotations { for _, v := range info.annotations {
lst = append(lst, v...) lst = append(lst, v...)
@ -87,53 +117,43 @@ func (info *ClaimInfo) annotationsAsList() []kubecontainer.Annotation {
return lst return lst
} }
// claimInfoCache is a cache of processed resource claims keyed by namespace + claim name. // cdiDevicesAsList returns a list of CDIDevices from the provided claim info.
type claimInfoCache struct { func (info *ClaimInfo) cdiDevicesAsList() []kubecontainer.CDIDevice {
sync.RWMutex var cdiDevices []kubecontainer.CDIDevice
state state.CheckpointState for _, devices := range info.CDIDevices {
claimInfo map[string]*ClaimInfo for _, device := range devices {
cdiDevices = append(cdiDevices, kubecontainer.CDIDevice{Name: device})
}
}
return cdiDevices
} }
func newClaimInfo(driverName, className string, claimUID types.UID, claimName, namespace string, podUIDs sets.Set[string], resourceHandles []resourcev1alpha2.ResourceHandle) *ClaimInfo { // addPodReference adds a pod reference to the claim info.
claimInfoState := state.ClaimInfoState{ func (info *ClaimInfo) addPodReference(podUID types.UID) {
DriverName: driverName, info.PodUIDs.Insert(string(podUID))
ClassName: className,
ClaimUID: claimUID,
ClaimName: claimName,
Namespace: namespace,
PodUIDs: podUIDs,
ResourceHandles: resourceHandles,
}
claimInfo := ClaimInfo{
ClaimInfoState: claimInfoState,
annotations: make(map[string][]kubecontainer.Annotation),
}
return &claimInfo
} }
// newClaimInfoFromResourceClaim creates a new ClaimInfo object // hasPodReference checks if a pod reference exists in the claim info.
func newClaimInfoFromResourceClaim(resourceClaim *resourcev1alpha2.ResourceClaim) *ClaimInfo { func (info *ClaimInfo) hasPodReference(podUID types.UID) bool {
// Grab the allocation.resourceHandles. If there are no return info.PodUIDs.Has(string(podUID))
// allocation.resourceHandles, create a single resourceHandle with no
// content. This will trigger processing of this claim by a single
// kubelet plugin whose name matches resourceClaim.Status.DriverName.
resourceHandles := resourceClaim.Status.Allocation.ResourceHandles
if len(resourceHandles) == 0 {
resourceHandles = make([]resourcev1alpha2.ResourceHandle, 1)
} }
return newClaimInfo( // deletePodReference deletes a pod reference from the claim info.
resourceClaim.Status.DriverName, func (info *ClaimInfo) deletePodReference(podUID types.UID) {
resourceClaim.Spec.ResourceClassName, info.PodUIDs.Delete(string(podUID))
resourceClaim.UID,
resourceClaim.Name,
resourceClaim.Namespace,
make(sets.Set[string]),
resourceHandles,
)
} }
// newClaimInfoCache is a function that returns an instance of the claimInfoCache. // setPrepared marks the claim info as prepared.
func (info *ClaimInfo) setPrepared() {
info.prepared = true
}
// isPrepared checks if claim info is prepared or not.
func (info *ClaimInfo) isPrepared() bool {
return info.prepared
}
// newClaimInfoCache creates a new claim info cache object, pre-populated from a checkpoint (if present).
func newClaimInfoCache(stateDir, checkpointName string) (*claimInfoCache, error) { func newClaimInfoCache(stateDir, checkpointName string) (*claimInfoCache, error) {
stateImpl, err := state.NewCheckpointState(stateDir, checkpointName) stateImpl, err := state.NewCheckpointState(stateDir, checkpointName)
if err != nil { if err != nil {
@ -151,45 +171,47 @@ func newClaimInfoCache(stateDir, checkpointName string) (*claimInfoCache, error)
} }
for _, entry := range curState { for _, entry := range curState {
info := newClaimInfo( info := newClaimInfoFromState(&entry)
entry.DriverName, cache.claimInfo[info.ClaimName+info.Namespace] = info
entry.ClassName,
entry.ClaimUID,
entry.ClaimName,
entry.Namespace,
entry.PodUIDs,
entry.ResourceHandles,
)
for pluginName, cdiDevices := range entry.CDIDevices {
err := info.addCDIDevices(pluginName, cdiDevices)
if err != nil {
return nil, fmt.Errorf("failed to add CDIDevices to claimInfo %+v: %+v", info, err)
}
}
cache.add(info)
} }
return cache, nil return cache, nil
} }
func (cache *claimInfoCache) add(res *ClaimInfo) { // withLock runs a function while holding the claimInfoCache lock.
func (cache *claimInfoCache) withLock(f func() error) error {
cache.Lock() cache.Lock()
defer cache.Unlock() defer cache.Unlock()
return f()
cache.claimInfo[res.ClaimName+res.Namespace] = res
} }
func (cache *claimInfoCache) get(claimName, namespace string) *ClaimInfo { // withRLock runs a function while holding the claimInfoCache rlock.
func (cache *claimInfoCache) withRLock(f func() error) error {
cache.RLock() cache.RLock()
defer cache.RUnlock() defer cache.RUnlock()
return f()
return cache.claimInfo[claimName+namespace]
} }
func (cache *claimInfoCache) delete(claimName, namespace string) { // add adds a new claim info object into the claim info cache.
cache.Lock() func (cache *claimInfoCache) add(info *ClaimInfo) *ClaimInfo {
defer cache.Unlock() cache.claimInfo[info.ClaimName+info.Namespace] = info
return info
}
// contains checks to see if a specific claim info object is already in the cache.
func (cache *claimInfoCache) contains(claimName, namespace string) bool {
_, exists := cache.claimInfo[claimName+namespace]
return exists
}
// get gets a specific claim info object from the cache.
func (cache *claimInfoCache) get(claimName, namespace string) (*ClaimInfo, bool) {
info, exists := cache.claimInfo[claimName+namespace]
return info, exists
}
// delete deletes a specific claim info object from the cache.
func (cache *claimInfoCache) delete(claimName, namespace string) {
delete(cache.claimInfo, claimName+namespace) delete(cache.claimInfo, claimName+namespace)
} }
@ -198,26 +220,19 @@ func (cache *claimInfoCache) delete(claimName, namespace string) {
// This function is used indirectly by the status manager // This function is used indirectly by the status manager
// to check if pod can enter termination status // to check if pod can enter termination status
func (cache *claimInfoCache) hasPodReference(UID types.UID) bool { func (cache *claimInfoCache) hasPodReference(UID types.UID) bool {
cache.RLock()
defer cache.RUnlock()
for _, claimInfo := range cache.claimInfo { for _, claimInfo := range cache.claimInfo {
if claimInfo.PodUIDs.Has(string(UID)) { if claimInfo.hasPodReference(UID) {
return true return true
} }
} }
return false return false
} }
// syncToCheckpoint syncs the full claim info cache state to a checkpoint.
func (cache *claimInfoCache) syncToCheckpoint() error { func (cache *claimInfoCache) syncToCheckpoint() error {
cache.RLock()
defer cache.RUnlock()
claimInfoStateList := make(state.ClaimInfoStateList, 0, len(cache.claimInfo)) claimInfoStateList := make(state.ClaimInfoStateList, 0, len(cache.claimInfo))
for _, infoClaim := range cache.claimInfo { for _, infoClaim := range cache.claimInfo {
claimInfoStateList = append(claimInfoStateList, infoClaim.ClaimInfoState) claimInfoStateList = append(claimInfoStateList, infoClaim.ClaimInfoState)
} }
return cache.state.Store(claimInfoStateList) return cache.state.Store(claimInfoStateList)
} }

View File

@ -67,7 +67,7 @@ func NewManagerImpl(kubeClient clientset.Interface, stateFileDirectory string, n
// containerResources on success. // containerResources on success.
func (m *ManagerImpl) PrepareResources(pod *v1.Pod) error { func (m *ManagerImpl) PrepareResources(pod *v1.Pod) error {
batches := make(map[string][]*drapb.Claim) batches := make(map[string][]*drapb.Claim)
claimInfos := make(map[types.UID]*ClaimInfo) resourceClaims := make(map[types.UID]*resourceapi.ResourceClaim)
for i := range pod.Spec.ResourceClaims { for i := range pod.Spec.ResourceClaims {
podClaim := &pod.Spec.ResourceClaims[i] podClaim := &pod.Spec.ResourceClaims[i]
klog.V(3).InfoS("Processing resource", "podClaim", podClaim.Name, "pod", pod.Name) klog.V(3).InfoS("Processing resource", "podClaim", podClaim.Name, "pod", pod.Name)
@ -108,40 +108,47 @@ func (m *ManagerImpl) PrepareResources(pod *v1.Pod) error {
continue continue
} }
claimInfo := m.cache.get(*claimName, pod.Namespace) // Atomically perform some operations on the claimInfo cache.
if claimInfo == nil { err = m.cache.withLock(func() error {
// claim does not exist in cache, create new claimInfo object // Get a reference to the claim info for this claim from the cache.
// to be processed later. // If there isn't one yet, then add it to the cache.
claimInfo = newClaimInfoFromResourceClaim(resourceClaim) claimInfo, exists := m.cache.get(resourceClaim.Name, resourceClaim.Namespace)
if !exists {
claimInfo = m.cache.add(newClaimInfoFromClaim(resourceClaim))
} }
// We delay checkpointing of this change until this call // Add a reference to the current pod in the claim info.
// returns successfully. It is OK to do this because we
// will only return successfully from this call if the
// checkpoint has succeeded. That means if the kubelet is
// ever restarted before this checkpoint succeeds, the pod
// whose resources are being prepared would never have
// started, so it's OK (actually correct) to not include it
// in the cache.
claimInfo.addPodReference(pod.UID) claimInfo.addPodReference(pod.UID)
if claimInfo.prepared { // Checkpoint to ensure all claims we plan to prepare are tracked.
// Already prepared this claim, no need to prepare it again // If something goes wrong and the newly referenced pod gets
continue // deleted without a successful prepare call, we will catch
// that in the reconcile loop and take the appropriate action.
if err := m.cache.syncToCheckpoint(); err != nil {
return fmt.Errorf("failed to checkpoint claimInfo state: %w", err)
} }
// If this claim is already prepared, there is no need to prepare it again.
if claimInfo.isPrepared() {
return nil
}
// This saved claim will be used to update ClaimInfo cache
// after NodePrepareResources GRPC succeeds
resourceClaims[claimInfo.ClaimUID] = resourceClaim
// Loop through all plugins and prepare for calling NodePrepareResources. // Loop through all plugins and prepare for calling NodePrepareResources.
for _, resourceHandle := range claimInfo.ResourceHandles { for _, resourceHandle := range claimInfo.ResourceHandles {
// If no DriverName is provided in the resourceHandle, we // If no DriverName is provided in the resourceHandle, we
// use the DriverName from the status // use the DriverName from the status
pluginName := resourceHandle.DriverName pluginName := claimInfo.DriverName
if pluginName == "" { if pluginName == "" {
pluginName = resourceClaim.Status.DriverName pluginName = claimInfo.DriverName
} }
claim := &drapb.Claim{ claim := &drapb.Claim{
Namespace: resourceClaim.Namespace, Namespace: claimInfo.Namespace,
Uid: string(resourceClaim.UID), Uid: string(claimInfo.ClaimUID),
Name: resourceClaim.Name, Name: claimInfo.ClaimName,
ResourceHandle: resourceHandle.Data, ResourceHandle: resourceHandle.Data,
} }
if resourceHandle.StructuredData != nil { if resourceHandle.StructuredData != nil {
@ -149,7 +156,12 @@ func (m *ManagerImpl) PrepareResources(pod *v1.Pod) error {
} }
batches[pluginName] = append(batches[pluginName], claim) batches[pluginName] = append(batches[pluginName], claim)
} }
claimInfos[resourceClaim.UID] = claimInfo
return nil
})
if err != nil {
return fmt.Errorf("locked cache operation: %w", err)
}
} }
// Call NodePrepareResources for all claims in each batch. // Call NodePrepareResources for all claims in each batch.
@ -175,34 +187,22 @@ func (m *ManagerImpl) PrepareResources(pod *v1.Pod) error {
return fmt.Errorf("NodePrepareResources failed for claim %s/%s: %s", reqClaim.Namespace, reqClaim.Name, result.Error) return fmt.Errorf("NodePrepareResources failed for claim %s/%s: %s", reqClaim.Namespace, reqClaim.Name, result.Error)
} }
claimInfo := claimInfos[types.UID(claimUID)] claim := resourceClaims[types.UID(claimUID)]
// Add the CDI Devices returned by NodePrepareResources to // Add the prepared CDI devices to the claim info
// the claimInfo object. err := m.cache.withLock(func() error {
err = claimInfo.addCDIDevices(pluginName, result.GetCDIDevices()) info, exists := m.cache.get(claim.Name, claim.Namespace)
if err != nil { if !exists {
return fmt.Errorf("failed to add CDIDevices to claimInfo %+v: %+v", claimInfo, err) return fmt.Errorf("unable to get claim info for claim %s in namespace %s", claim.Name, claim.Namespace)
} }
// mark claim as (successfully) prepared by manager, so next time we don't prepare it. if err := info.setCDIDevices(pluginName, result.GetCDIDevices()); err != nil {
claimInfo.prepared = true return fmt.Errorf("unable to add CDI devices for plugin %s of claim %s in namespace %s", pluginName, claim.Name, claim.Namespace)
// TODO: We (re)add the claimInfo object to the cache and
// sync it to the checkpoint *after* the
// NodePrepareResources call has completed. This will cause
// issues if the kubelet gets restarted between
// NodePrepareResources and syncToCheckpoint. It will result
// in not calling NodeUnprepareResources for this claim
// because no claimInfo will be synced back to the cache
// for it after the restart. We need to resolve this issue
// before moving to beta.
m.cache.add(claimInfo)
} }
return nil
// Checkpoint to reduce redundant calls to })
// NodePrepareResources after a kubelet restart.
err = m.cache.syncToCheckpoint()
if err != nil { if err != nil {
return fmt.Errorf("failed to checkpoint claimInfo state, err: %+v", err) return fmt.Errorf("locked cache operation: %w", err)
}
} }
unfinished := len(claims) - len(response.Claims) unfinished := len(claims) - len(response.Claims)
@ -210,11 +210,30 @@ func (m *ManagerImpl) PrepareResources(pod *v1.Pod) error {
return fmt.Errorf("NodePrepareResources left out %d claims", unfinished) return fmt.Errorf("NodePrepareResources left out %d claims", unfinished)
} }
} }
// Checkpoint to capture all of the previous addPodReference() calls.
err := m.cache.syncToCheckpoint() // Atomically perform some operations on the claimInfo cache.
if err != nil { err := m.cache.withLock(func() error {
return fmt.Errorf("failed to checkpoint claimInfo state, err: %+v", err) // Mark all pod claims as prepared.
for _, claim := range resourceClaims {
info, exists := m.cache.get(claim.Name, claim.Namespace)
if !exists {
return fmt.Errorf("unable to get claim info for claim %s in namespace %s", claim.Name, claim.Namespace)
} }
info.setPrepared()
}
// Checkpoint to ensure all prepared claims are tracked with their list
// of CDI devices attached.
if err := m.cache.syncToCheckpoint(); err != nil {
return fmt.Errorf("failed to checkpoint claimInfo state: %w", err)
}
return nil
})
if err != nil {
return fmt.Errorf("locked cache operation: %w", err)
}
return nil return nil
} }
@ -277,22 +296,26 @@ func (m *ManagerImpl) GetResources(pod *v1.Pod, container *v1.Container) (*Conta
continue continue
} }
claimInfo := m.cache.get(*claimName, pod.Namespace) err := m.cache.withRLock(func() error {
if claimInfo == nil { claimInfo, exists := m.cache.get(*claimName, pod.Namespace)
return nil, fmt.Errorf("unable to get resource for namespace: %s, claim: %s", pod.Namespace, *claimName) if !exists {
return fmt.Errorf("unable to get claim info for claim %s in namespace %s", *claimName, pod.Namespace)
} }
claimInfo.RLock()
claimAnnotations := claimInfo.annotationsAsList() claimAnnotations := claimInfo.annotationsAsList()
klog.V(3).InfoS("Add resource annotations", "claim", *claimName, "annotations", claimAnnotations) klog.V(3).InfoS("Add resource annotations", "claim", *claimName, "annotations", claimAnnotations)
annotations = append(annotations, claimAnnotations...) annotations = append(annotations, claimAnnotations...)
for _, devices := range claimInfo.CDIDevices {
for _, device := range devices { devices := claimInfo.cdiDevicesAsList()
cdiDevices = append(cdiDevices, kubecontainer.CDIDevice{Name: device}) klog.V(3).InfoS("Add CDI devices", "claim", *claimName, "CDI devices", devices)
cdiDevices = append(cdiDevices, devices...)
return nil
})
if err != nil {
return nil, fmt.Errorf("locked cache operation: %w", err)
} }
} }
claimInfo.RUnlock()
}
} }
return &ContainerInfo{Annotations: annotations, CDIDevices: cdiDevices}, nil return &ContainerInfo{Annotations: annotations, CDIDevices: cdiDevices}, nil
@ -303,39 +326,53 @@ func (m *ManagerImpl) GetResources(pod *v1.Pod, container *v1.Container) (*Conta
// As such, calls to the underlying NodeUnprepareResource API are skipped for claims that have // As such, calls to the underlying NodeUnprepareResource API are skipped for claims that have
// already been successfully unprepared. // already been successfully unprepared.
func (m *ManagerImpl) UnprepareResources(pod *v1.Pod) error { func (m *ManagerImpl) UnprepareResources(pod *v1.Pod) error {
batches := make(map[string][]*drapb.Claim) var claimNames []string
claimInfos := make(map[types.UID]*ClaimInfo)
for i := range pod.Spec.ResourceClaims { for i := range pod.Spec.ResourceClaims {
claimName, _, err := resourceclaim.Name(pod, &pod.Spec.ResourceClaims[i]) claimName, _, err := resourceclaim.Name(pod, &pod.Spec.ResourceClaims[i])
if err != nil { if err != nil {
return fmt.Errorf("unprepare resource claim: %v", err) return fmt.Errorf("unprepare resource claim: %v", err)
} }
// The claim name might be nil if no underlying resource claim // The claim name might be nil if no underlying resource claim
// was generated for the referenced claim. There are valid use // was generated for the referenced claim. There are valid use
// cases when this might happen, so we simply skip it. // cases when this might happen, so we simply skip it.
if claimName == nil { if claimName == nil {
continue continue
} }
claimNames = append(claimNames, *claimName)
}
return m.unprepareResources(pod.UID, pod.Namespace, claimNames)
}
claimInfo := m.cache.get(*claimName, pod.Namespace) func (m *ManagerImpl) unprepareResources(podUID types.UID, namespace string, claimNames []string) error {
batches := make(map[string][]*drapb.Claim)
claimNamesMap := make(map[types.UID]string)
for _, claimName := range claimNames {
// Atomically perform some operations on the claimInfo cache.
err := m.cache.withLock(func() error {
// Get the claim info from the cache
claimInfo, exists := m.cache.get(claimName, namespace)
// Skip calling NodeUnprepareResource if claim info is not cached // Skip calling NodeUnprepareResource if claim info is not cached
if claimInfo == nil { if !exists {
continue return nil
} }
// Skip calling NodeUnprepareResource if other pods are still referencing it // Skip calling NodeUnprepareResource if other pods are still referencing it
if len(claimInfo.PodUIDs) > 1 { if len(claimInfo.PodUIDs) > 1 {
// We delay checkpointing of this change until this call returns successfully. // We delay checkpointing of this change until
// It is OK to do this because we will only return successfully from this call if // UnprepareResources returns successfully. It is OK to do
// the checkpoint has succeeded. That means if the kubelet is ever restarted // this because we will only return successfully from this call
// before this checkpoint succeeds, we will simply call into this (idempotent) // if the checkpoint has succeeded. That means if the kubelet
// function again. // is ever restarted before this checkpoint succeeds, we will
claimInfo.deletePodReference(pod.UID) // simply call into this (idempotent) function again.
continue claimInfo.deletePodReference(podUID)
return nil
} }
// This claimInfo name will be used to update ClaimInfo cache
// after NodeUnprepareResources GRPC succeeds
claimNamesMap[claimInfo.ClaimUID] = claimInfo.ClaimName
// Loop through all plugins and prepare for calling NodeUnprepareResources. // Loop through all plugins and prepare for calling NodeUnprepareResources.
for _, resourceHandle := range claimInfo.ResourceHandles { for _, resourceHandle := range claimInfo.ResourceHandles {
// If no DriverName is provided in the resourceHandle, we // If no DriverName is provided in the resourceHandle, we
@ -356,7 +393,12 @@ func (m *ManagerImpl) UnprepareResources(pod *v1.Pod) error {
} }
batches[pluginName] = append(batches[pluginName], claim) batches[pluginName] = append(batches[pluginName], claim)
} }
claimInfos[claimInfo.ClaimUID] = claimInfo
return nil
})
if err != nil {
return fmt.Errorf("locked cache operation: %w", err)
}
} }
// Call NodeUnprepareResources for all claims in each batch. // Call NodeUnprepareResources for all claims in each batch.
@ -382,20 +424,6 @@ func (m *ManagerImpl) UnprepareResources(pod *v1.Pod) error {
if result.GetError() != "" { if result.GetError() != "" {
return fmt.Errorf("NodeUnprepareResources failed for claim %s/%s: %s", reqClaim.Namespace, reqClaim.Name, result.Error) return fmt.Errorf("NodeUnprepareResources failed for claim %s/%s: %s", reqClaim.Namespace, reqClaim.Name, result.Error)
} }
// Delete last pod UID only if unprepare succeeds.
// This ensures that the status manager doesn't enter termination status
// for the pod. This logic is implemented in
// m.PodMightNeedToUnprepareResources and claimInfo.hasPodReference.
claimInfo := claimInfos[types.UID(claimUID)]
claimInfo.deletePodReference(pod.UID)
m.cache.delete(claimInfo.ClaimName, pod.Namespace)
}
// Checkpoint to reduce redundant calls to NodeUnprepareResources after a kubelet restart.
err = m.cache.syncToCheckpoint()
if err != nil {
return fmt.Errorf("failed to checkpoint claimInfo state, err: %+v", err)
} }
unfinished := len(claims) - len(response.Claims) unfinished := len(claims) - len(response.Claims)
@ -404,21 +432,35 @@ func (m *ManagerImpl) UnprepareResources(pod *v1.Pod) error {
} }
} }
// Checkpoint to capture all of the previous deletePodReference() calls. // Atomically perform some operations on the claimInfo cache.
err := m.cache.syncToCheckpoint() err := m.cache.withLock(func() error {
if err != nil { // Delete all claimInfos from the cache that have just been unprepared.
return fmt.Errorf("failed to checkpoint claimInfo state, err: %+v", err) for _, claimName := range claimNamesMap {
m.cache.delete(claimName, namespace)
} }
// Atomically sync the cache back to the checkpoint.
if err := m.cache.syncToCheckpoint(); err != nil {
return fmt.Errorf("failed to checkpoint claimInfo state: %w", err)
}
return nil
})
if err != nil {
return fmt.Errorf("locked cache operation: %w", err)
}
return nil return nil
} }
// PodMightNeedToUnprepareResources returns true if the pod might need to // PodMightNeedToUnprepareResources returns true if the pod might need to
// unprepare resources // unprepare resources
func (m *ManagerImpl) PodMightNeedToUnprepareResources(UID types.UID) bool { func (m *ManagerImpl) PodMightNeedToUnprepareResources(UID types.UID) bool {
m.cache.Lock()
defer m.cache.Unlock()
return m.cache.hasPodReference(UID) return m.cache.hasPodReference(UID)
} }
// GetCongtainerClaimInfos gets Container's ClaimInfo // GetContainerClaimInfos gets Container's ClaimInfo
func (m *ManagerImpl) GetContainerClaimInfos(pod *v1.Pod, container *v1.Container) ([]*ClaimInfo, error) { func (m *ManagerImpl) GetContainerClaimInfos(pod *v1.Pod, container *v1.Container) ([]*ClaimInfo, error) {
claimInfos := make([]*ClaimInfo, 0, len(pod.Spec.ResourceClaims)) claimInfos := make([]*ClaimInfo, 0, len(pod.Spec.ResourceClaims))
@ -432,11 +474,18 @@ func (m *ManagerImpl) GetContainerClaimInfos(pod *v1.Pod, container *v1.Containe
if podResourceClaim.Name != claim.Name { if podResourceClaim.Name != claim.Name {
continue continue
} }
claimInfo := m.cache.get(*claimName, pod.Namespace)
if claimInfo == nil { err := m.cache.withRLock(func() error {
return nil, fmt.Errorf("unable to get resource for namespace: %s, claim: %s", pod.Namespace, *claimName) claimInfo, exists := m.cache.get(*claimName, pod.Namespace)
if !exists {
return fmt.Errorf("unable to get claim info for claim %s in namespace %s", *claimName, pod.Namespace)
}
claimInfos = append(claimInfos, claimInfo.DeepCopy())
return nil
})
if err != nil {
return nil, fmt.Errorf("locked cache operation: %w", err)
} }
claimInfos = append(claimInfos, claimInfo)
} }
} }
return claimInfos, nil return claimInfos, nil

View File

@ -36,6 +36,7 @@ type CheckpointState interface {
} }
// ClaimInfoState is used to store claim info state in a checkpoint // ClaimInfoState is used to store claim info state in a checkpoint
// +k8s:deepcopy-gen=true
type ClaimInfoState struct { type ClaimInfoState struct {
// Name of the DRA driver // Name of the DRA driver
DriverName string DriverName string

View File

@ -0,0 +1,72 @@
//go:build !ignore_autogenerated
// +build !ignore_autogenerated
/*
Copyright The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Code generated by deepcopy-gen. DO NOT EDIT.
package state
import (
v1alpha2 "k8s.io/api/resource/v1alpha2"
sets "k8s.io/apimachinery/pkg/util/sets"
)
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *ClaimInfoState) DeepCopyInto(out *ClaimInfoState) {
*out = *in
if in.PodUIDs != nil {
in, out := &in.PodUIDs, &out.PodUIDs
*out = make(sets.Set[string], len(*in))
for key, val := range *in {
(*out)[key] = val
}
}
if in.ResourceHandles != nil {
in, out := &in.ResourceHandles, &out.ResourceHandles
*out = make([]v1alpha2.ResourceHandle, len(*in))
for i := range *in {
(*in)[i].DeepCopyInto(&(*out)[i])
}
}
if in.CDIDevices != nil {
in, out := &in.CDIDevices, &out.CDIDevices
*out = make(map[string][]string, len(*in))
for key, val := range *in {
var outVal []string
if val == nil {
(*out)[key] = nil
} else {
in, out := &val, &outVal
*out = make([]string, len(*in))
copy(*out, *in)
}
(*out)[key] = outVal
}
}
return
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ClaimInfoState.
func (in *ClaimInfoState) DeepCopy() *ClaimInfoState {
if in == nil {
return nil
}
out := new(ClaimInfoState)
in.DeepCopyInto(out)
return out
}

View File

@ -0,0 +1,58 @@
//go:build !ignore_autogenerated
// +build !ignore_autogenerated
/*
Copyright The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Code generated by deepcopy-gen. DO NOT EDIT.
package dra
import (
container "k8s.io/kubernetes/pkg/kubelet/container"
)
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *ClaimInfo) DeepCopyInto(out *ClaimInfo) {
*out = *in
in.ClaimInfoState.DeepCopyInto(&out.ClaimInfoState)
if in.annotations != nil {
in, out := &in.annotations, &out.annotations
*out = make(map[string][]container.Annotation, len(*in))
for key, val := range *in {
var outVal []container.Annotation
if val == nil {
(*out)[key] = nil
} else {
in, out := &val, &outVal
*out = make([]container.Annotation, len(*in))
copy(*out, *in)
}
(*out)[key] = outVal
}
}
return
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ClaimInfo.
func (in *ClaimInfo) DeepCopy() *ClaimInfo {
if in == nil {
return nil
}
out := new(ClaimInfo)
in.DeepCopyInto(out)
return out
}