DRA: call plugins for claims even if exist in cache

Today, DRA manager does not call plugin NodePrepareResource
for claims that it previously successfully handled, that is,
if claims are present in cache (checkpoint) even if node
rebooted.

After node reboots, it is required to call DRA plugin
for resource claims so that plugins may prepare them
again in case the resources dont persist reboot.

To achieve that, once kubelet is started, we call DRA
plugins for claims once if a pod sandbox is required
to be created during PodSync.

Signed-off-by: adrianc <adrianc@nvidia.com>
This commit is contained in:
adrianc 2023-09-10 17:12:31 +03:00
parent 755644a169
commit 08b942028f
No known key found for this signature in database
GPG Key ID: CC911D9D88FC463E
2 changed files with 64 additions and 39 deletions

View File

@ -33,9 +33,10 @@ import (
type ClaimInfo struct { type ClaimInfo struct {
sync.RWMutex sync.RWMutex
state.ClaimInfoState state.ClaimInfoState
// annotations is a list of container annotations associated with // annotations is a mapping of container annotations per DRA plugin associated with
// a prepared resource // a prepared resource
annotations []kubecontainer.Annotation annotations map[string][]kubecontainer.Annotation
prepared bool
} }
func (info *ClaimInfo) addPodReference(podUID types.UID) { func (info *ClaimInfo) addPodReference(podUID types.UID) {
@ -69,11 +70,23 @@ func (info *ClaimInfo) addCDIDevices(pluginName string, cdiDevices []string) err
} }
info.CDIDevices[pluginName] = cdiDevices info.CDIDevices[pluginName] = cdiDevices
info.annotations = append(info.annotations, annotations...) info.annotations[pluginName] = annotations
return nil return nil
} }
// annotationsAsList returns container annotations as a single list.
func (info *ClaimInfo) annotationsAsList() []kubecontainer.Annotation {
info.RLock()
defer info.RUnlock()
var lst []kubecontainer.Annotation
for _, v := range info.annotations {
lst = append(lst, v...)
}
return lst
}
// claimInfoCache is a cache of processed resource claims keyed by namespace + claim name. // claimInfoCache is a cache of processed resource claims keyed by namespace + claim name.
type claimInfoCache struct { type claimInfoCache struct {
sync.RWMutex sync.RWMutex
@ -93,10 +106,33 @@ func newClaimInfo(driverName, className string, claimUID types.UID, claimName, n
} }
claimInfo := ClaimInfo{ claimInfo := ClaimInfo{
ClaimInfoState: claimInfoState, ClaimInfoState: claimInfoState,
annotations: make(map[string][]kubecontainer.Annotation),
} }
return &claimInfo return &claimInfo
} }
// newClaimInfoFromResourceClaim creates a new ClaimInfo object
func newClaimInfoFromResourceClaim(resourceClaim *resourcev1alpha2.ResourceClaim) *ClaimInfo {
// Grab the allocation.resourceHandles. If there are no
// allocation.resourceHandles, create a single resourceHandle with no
// content. This will trigger processing of this claim by a single
// kubelet plugin whose name matches resourceClaim.Status.DriverName.
resourceHandles := resourceClaim.Status.Allocation.ResourceHandles
if len(resourceHandles) == 0 {
resourceHandles = make([]resourcev1alpha2.ResourceHandle, 1)
}
return newClaimInfo(
resourceClaim.Status.DriverName,
resourceClaim.Spec.ResourceClassName,
resourceClaim.UID,
resourceClaim.Name,
resourceClaim.Namespace,
make(sets.Set[string]),
resourceHandles,
)
}
// newClaimInfoCache is a function that returns an instance of the claimInfoCache. // newClaimInfoCache is a function that returns an instance of the claimInfoCache.
func newClaimInfoCache(stateDir, checkpointName string) (*claimInfoCache, error) { func newClaimInfoCache(stateDir, checkpointName string) (*claimInfoCache, error) {
stateImpl, err := state.NewCheckpointState(stateDir, checkpointName) stateImpl, err := state.NewCheckpointState(stateDir, checkpointName)

View File

@ -21,10 +21,8 @@ import (
"fmt" "fmt"
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
resourcev1alpha2 "k8s.io/api/resource/v1alpha2"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
clientset "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes"
"k8s.io/dynamic-resource-allocation/resourceclaim" "k8s.io/dynamic-resource-allocation/resourceclaim"
"k8s.io/klog/v2" "k8s.io/klog/v2"
@ -109,8 +107,13 @@ func (m *ManagerImpl) PrepareResources(pod *v1.Pod) error {
continue continue
} }
// Is the resource already prepared? Then add the pod UID to it. claimInfo := m.cache.get(*claimName, pod.Namespace)
if claimInfo := m.cache.get(*claimName, pod.Namespace); claimInfo != nil { if claimInfo == nil {
// claim does not exist in cache, create new claimInfo object
// to be processed later.
claimInfo = newClaimInfoFromResourceClaim(resourceClaim)
}
// We delay checkpointing of this change until this call // We delay checkpointing of this change until this call
// returns successfully. It is OK to do this because we // returns successfully. It is OK to do this because we
// will only return successfully from this call if the // will only return successfully from this call if the
@ -120,31 +123,14 @@ func (m *ManagerImpl) PrepareResources(pod *v1.Pod) error {
// started, so it's OK (actually correct) to not include it // started, so it's OK (actually correct) to not include it
// in the cache. // in the cache.
claimInfo.addPodReference(pod.UID) claimInfo.addPodReference(pod.UID)
if claimInfo.prepared {
// Already prepared this claim, no need to prepare it again
continue continue
} }
// Grab the allocation.resourceHandles. If there are no
// allocation.resourceHandles, create a single resourceHandle with no
// content. This will trigger processing of this claim by a single
// kubelet plugin whose name matches resourceClaim.Status.DriverName.
resourceHandles := resourceClaim.Status.Allocation.ResourceHandles
if len(resourceHandles) == 0 {
resourceHandles = make([]resourcev1alpha2.ResourceHandle, 1)
}
// Create a claimInfo object to store the relevant claim info.
claimInfo := newClaimInfo(
resourceClaim.Status.DriverName,
resourceClaim.Spec.ResourceClassName,
resourceClaim.UID,
resourceClaim.Name,
resourceClaim.Namespace,
sets.New(string(pod.UID)),
resourceHandles,
)
// Loop through all plugins and prepare for calling NodePrepareResources. // Loop through all plugins and prepare for calling NodePrepareResources.
for _, resourceHandle := range resourceHandles { for _, resourceHandle := range claimInfo.ResourceHandles {
// If no DriverName is provided in the resourceHandle, we // If no DriverName is provided in the resourceHandle, we
// use the DriverName from the status // use the DriverName from the status
pluginName := resourceHandle.DriverName pluginName := resourceHandle.DriverName
@ -193,6 +179,8 @@ func (m *ManagerImpl) PrepareResources(pod *v1.Pod) error {
if err != nil { if err != nil {
return fmt.Errorf("failed to add CDIDevices to claimInfo %+v: %+v", claimInfo, err) return fmt.Errorf("failed to add CDIDevices to claimInfo %+v: %+v", claimInfo, err)
} }
// mark claim as (successfully) prepared by manager, so next time we dont prepare it.
claimInfo.prepared = true
// TODO: We (re)add the claimInfo object to the cache and // TODO: We (re)add the claimInfo object to the cache and
// sync it to the checkpoint *after* the // sync it to the checkpoint *after* the
@ -291,8 +279,9 @@ func (m *ManagerImpl) GetResources(pod *v1.Pod, container *v1.Container) (*Conta
} }
claimInfo.RLock() claimInfo.RLock()
klog.V(3).InfoS("Add resource annotations", "claim", *claimName, "annotations", claimInfo.annotations) claimAnnotations := claimInfo.annotationsAsList()
annotations = append(annotations, claimInfo.annotations...) klog.V(3).InfoS("Add resource annotations", "claim", *claimName, "annotations", claimAnnotations)
annotations = append(annotations, claimAnnotations...)
for _, devices := range claimInfo.CDIDevices { for _, devices := range claimInfo.CDIDevices {
for _, device := range devices { for _, device := range devices {
cdiDevices = append(cdiDevices, kubecontainer.CDIDevice{Name: device}) cdiDevices = append(cdiDevices, kubecontainer.CDIDevice{Name: device})