diff --git a/pkg/kubelet/cm/dra/claiminfo.go b/pkg/kubelet/cm/dra/claiminfo.go index 7266f9e72b2..d369b8d3e33 100644 --- a/pkg/kubelet/cm/dra/claiminfo.go +++ b/pkg/kubelet/cm/dra/claiminfo.go @@ -33,9 +33,10 @@ import ( type ClaimInfo struct { sync.RWMutex state.ClaimInfoState - // annotations is a list of container annotations associated with + // annotations is a mapping of container annotations per DRA plugin associated with // a prepared resource - annotations []kubecontainer.Annotation + annotations map[string][]kubecontainer.Annotation + prepared bool } func (info *ClaimInfo) addPodReference(podUID types.UID) { @@ -69,11 +70,23 @@ func (info *ClaimInfo) addCDIDevices(pluginName string, cdiDevices []string) err } info.CDIDevices[pluginName] = cdiDevices - info.annotations = append(info.annotations, annotations...) + info.annotations[pluginName] = annotations return nil } +// annotationsAsList returns container annotations as a single list. +func (info *ClaimInfo) annotationsAsList() []kubecontainer.Annotation { + info.RLock() + defer info.RUnlock() + + var lst []kubecontainer.Annotation + for _, v := range info.annotations { + lst = append(lst, v...) + } + return lst +} + // claimInfoCache is a cache of processed resource claims keyed by namespace + claim name. type claimInfoCache struct { sync.RWMutex @@ -93,10 +106,33 @@ func newClaimInfo(driverName, className string, claimUID types.UID, claimName, n } claimInfo := ClaimInfo{ ClaimInfoState: claimInfoState, + annotations: make(map[string][]kubecontainer.Annotation), } return &claimInfo } +// newClaimInfoFromResourceClaim creates a new ClaimInfo object +func newClaimInfoFromResourceClaim(resourceClaim *resourcev1alpha2.ResourceClaim) *ClaimInfo { + // Grab the allocation.resourceHandles. If there are no + // allocation.resourceHandles, create a single resourceHandle with no + // content. This will trigger processing of this claim by a single + // kubelet plugin whose name matches resourceClaim.Status.DriverName. + resourceHandles := resourceClaim.Status.Allocation.ResourceHandles + if len(resourceHandles) == 0 { + resourceHandles = make([]resourcev1alpha2.ResourceHandle, 1) + } + + return newClaimInfo( + resourceClaim.Status.DriverName, + resourceClaim.Spec.ResourceClassName, + resourceClaim.UID, + resourceClaim.Name, + resourceClaim.Namespace, + make(sets.Set[string]), + resourceHandles, + ) +} + // newClaimInfoCache is a function that returns an instance of the claimInfoCache. func newClaimInfoCache(stateDir, checkpointName string) (*claimInfoCache, error) { stateImpl, err := state.NewCheckpointState(stateDir, checkpointName) diff --git a/pkg/kubelet/cm/dra/manager.go b/pkg/kubelet/cm/dra/manager.go index 703eae58b4f..62a2bd4cd4f 100644 --- a/pkg/kubelet/cm/dra/manager.go +++ b/pkg/kubelet/cm/dra/manager.go @@ -21,10 +21,8 @@ import ( "fmt" v1 "k8s.io/api/core/v1" - resourcev1alpha2 "k8s.io/api/resource/v1alpha2" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/sets" clientset "k8s.io/client-go/kubernetes" "k8s.io/dynamic-resource-allocation/resourceclaim" "k8s.io/klog/v2" @@ -109,42 +107,30 @@ func (m *ManagerImpl) PrepareResources(pod *v1.Pod) error { continue } - // Is the resource already prepared? Then add the pod UID to it. - if claimInfo := m.cache.get(*claimName, pod.Namespace); claimInfo != nil { - // We delay checkpointing of this change until this call - // returns successfully. It is OK to do this because we - // will only return successfully from this call if the - // checkpoint has succeeded. That means if the kubelet is - // ever restarted before this checkpoint succeeds, the pod - // whose resources are being prepared would never have - // started, so it's OK (actually correct) to not include it - // in the cache. - claimInfo.addPodReference(pod.UID) + claimInfo := m.cache.get(*claimName, pod.Namespace) + if claimInfo == nil { + // claim does not exist in cache, create new claimInfo object + // to be processed later. + claimInfo = newClaimInfoFromResourceClaim(resourceClaim) + } + + // We delay checkpointing of this change until this call + // returns successfully. It is OK to do this because we + // will only return successfully from this call if the + // checkpoint has succeeded. That means if the kubelet is + // ever restarted before this checkpoint succeeds, the pod + // whose resources are being prepared would never have + // started, so it's OK (actually correct) to not include it + // in the cache. + claimInfo.addPodReference(pod.UID) + + if claimInfo.prepared { + // Already prepared this claim, no need to prepare it again continue } - // Grab the allocation.resourceHandles. If there are no - // allocation.resourceHandles, create a single resourceHandle with no - // content. This will trigger processing of this claim by a single - // kubelet plugin whose name matches resourceClaim.Status.DriverName. - resourceHandles := resourceClaim.Status.Allocation.ResourceHandles - if len(resourceHandles) == 0 { - resourceHandles = make([]resourcev1alpha2.ResourceHandle, 1) - } - - // Create a claimInfo object to store the relevant claim info. - claimInfo := newClaimInfo( - resourceClaim.Status.DriverName, - resourceClaim.Spec.ResourceClassName, - resourceClaim.UID, - resourceClaim.Name, - resourceClaim.Namespace, - sets.New(string(pod.UID)), - resourceHandles, - ) - // Loop through all plugins and prepare for calling NodePrepareResources. - for _, resourceHandle := range resourceHandles { + for _, resourceHandle := range claimInfo.ResourceHandles { // If no DriverName is provided in the resourceHandle, we // use the DriverName from the status pluginName := resourceHandle.DriverName @@ -193,6 +179,8 @@ func (m *ManagerImpl) PrepareResources(pod *v1.Pod) error { if err != nil { return fmt.Errorf("failed to add CDIDevices to claimInfo %+v: %+v", claimInfo, err) } + // mark claim as (successfully) prepared by manager, so next time we dont prepare it. + claimInfo.prepared = true // TODO: We (re)add the claimInfo object to the cache and // sync it to the checkpoint *after* the @@ -291,8 +279,9 @@ func (m *ManagerImpl) GetResources(pod *v1.Pod, container *v1.Container) (*Conta } claimInfo.RLock() - klog.V(3).InfoS("Add resource annotations", "claim", *claimName, "annotations", claimInfo.annotations) - annotations = append(annotations, claimInfo.annotations...) + claimAnnotations := claimInfo.annotationsAsList() + klog.V(3).InfoS("Add resource annotations", "claim", *claimName, "annotations", claimAnnotations) + annotations = append(annotations, claimAnnotations...) for _, devices := range claimInfo.CDIDevices { for _, device := range devices { cdiDevices = append(cdiDevices, kubecontainer.CDIDevice{Name: device})