DRA: get rid of unneeded loops over pod containers

This commit is contained in:
Ed Bartosh 2023-03-15 09:41:30 +02:00
parent 37937bb227
commit 1aeec10efb

View File

@ -66,112 +66,107 @@ func NewManagerImpl(kubeClient clientset.Interface, stateFileDirectory string) (
// for each new resource requirement, process their responses and update the cached // for each new resource requirement, process their responses and update the cached
// containerResources on success. // containerResources on success.
func (m *ManagerImpl) PrepareResources(pod *v1.Pod) error { func (m *ManagerImpl) PrepareResources(pod *v1.Pod) error {
// Process resources for each resource claim referenced by container for i := range pod.Spec.ResourceClaims {
for _, container := range append(pod.Spec.InitContainers, pod.Spec.Containers...) { claimName := resourceclaim.Name(pod, &pod.Spec.ResourceClaims[i])
for range container.Resources.Claims { klog.V(3).InfoS("Processing resource", "claim", claimName, "pod", pod.Name)
for i := range pod.Spec.ResourceClaims {
claimName := resourceclaim.Name(pod, &pod.Spec.ResourceClaims[i])
klog.V(3).InfoS("Processing resource", "claim", claimName, "pod", pod.Name)
// Resource is already prepared, add pod UID to it // Resource is already prepared, add pod UID to it
if claimInfo := m.cache.get(claimName, pod.Namespace); claimInfo != nil { if claimInfo := m.cache.get(claimName, pod.Namespace); claimInfo != nil {
// We delay checkpointing of this change until this call // We delay checkpointing of this change until this call
// returns successfully. It is OK to do this because we // returns successfully. It is OK to do this because we
// will only return successfully from this call if the // will only return successfully from this call if the
// checkpoint has succeeded. That means if the kubelet is // checkpoint has succeeded. That means if the kubelet is
// ever restarted before this checkpoint succeeds, the pod // ever restarted before this checkpoint succeeds, the pod
// whose resources are being prepared would never have // whose resources are being prepared would never have
// started, so it's OK (actually correct) to not include it // started, so it's OK (actually correct) to not include it
// in the cache. // in the cache.
claimInfo.addPodReference(pod.UID) claimInfo.addPodReference(pod.UID)
continue continue
} }
// Query claim object from the API server // Query claim object from the API server
resourceClaim, err := m.kubeClient.ResourceV1alpha2().ResourceClaims(pod.Namespace).Get( resourceClaim, err := m.kubeClient.ResourceV1alpha2().ResourceClaims(pod.Namespace).Get(
context.TODO(), context.TODO(),
claimName, claimName,
metav1.GetOptions{}) metav1.GetOptions{})
if err != nil { if err != nil {
return fmt.Errorf("failed to fetch ResourceClaim %s referenced by pod %s: %+v", claimName, pod.Name, err) return fmt.Errorf("failed to fetch ResourceClaim %s referenced by pod %s: %+v", claimName, pod.Name, err)
} }
// Check if pod is in the ReservedFor for the claim // Check if pod is in the ReservedFor for the claim
if !resourceclaim.IsReservedForPod(pod, resourceClaim) { if !resourceclaim.IsReservedForPod(pod, resourceClaim) {
return fmt.Errorf("pod %s(%s) is not allowed to use resource claim %s(%s)", return fmt.Errorf("pod %s(%s) is not allowed to use resource claim %s(%s)",
pod.Name, pod.UID, claimName, resourceClaim.UID) pod.Name, pod.UID, claimName, resourceClaim.UID)
} }
// Grab the allocation.resourceHandles. If there are no // Grab the allocation.resourceHandles. If there are no
// allocation.resourceHandles, create a single resourceHandle with no // allocation.resourceHandles, create a single resourceHandle with no
// content. This will trigger processing of this claim by a single // content. This will trigger processing of this claim by a single
// kubelet plugin whose name matches resourceClaim.Status.DriverName. // kubelet plugin whose name matches resourceClaim.Status.DriverName.
resourceHandles := resourceClaim.Status.Allocation.ResourceHandles resourceHandles := resourceClaim.Status.Allocation.ResourceHandles
if len(resourceHandles) == 0 { if len(resourceHandles) == 0 {
resourceHandles = make([]resourcev1alpha2.ResourceHandle, 1) resourceHandles = make([]resourcev1alpha2.ResourceHandle, 1)
} }
// Create a claimInfo object to store the relevant claim info. // Create a claimInfo object to store the relevant claim info.
claimInfo := newClaimInfo( claimInfo := newClaimInfo(
resourceClaim.Status.DriverName, resourceClaim.Status.DriverName,
resourceClaim.Spec.ResourceClassName, resourceClaim.Spec.ResourceClassName,
resourceClaim.UID, resourceClaim.UID,
resourceClaim.Name, resourceClaim.Name,
resourceClaim.Namespace, resourceClaim.Namespace,
sets.New(string(pod.UID)), sets.New(string(pod.UID)),
) )
// Walk through each resourceHandle // Walk through each resourceHandle
for _, resourceHandle := range resourceHandles { for _, resourceHandle := range resourceHandles {
// If no DriverName is provided in the resourceHandle, we // If no DriverName is provided in the resourceHandle, we
// use the DriverName from the status // use the DriverName from the status
pluginName := resourceHandle.DriverName pluginName := resourceHandle.DriverName
if pluginName == "" { if pluginName == "" {
pluginName = resourceClaim.Status.DriverName pluginName = resourceClaim.Status.DriverName
} }
// Call NodePrepareResource RPC for each resourceHandle // Call NodePrepareResource RPC for each resourceHandle
client, err := dra.NewDRAPluginClient(pluginName) client, err := dra.NewDRAPluginClient(pluginName)
if err != nil { if err != nil {
return fmt.Errorf("failed to get DRA Plugin client for plugin name %s, err=%+v", pluginName, err) return fmt.Errorf("failed to get DRA Plugin client for plugin name %s, err=%+v", pluginName, err)
} }
response, err := client.NodePrepareResource( response, err := client.NodePrepareResource(
context.Background(), context.Background(),
resourceClaim.Namespace, resourceClaim.Namespace,
resourceClaim.UID, resourceClaim.UID,
resourceClaim.Name, resourceClaim.Name,
resourceHandle.Data) resourceHandle.Data)
if err != nil { if err != nil {
return fmt.Errorf("NodePrepareResource failed, claim UID: %s, claim name: %s, resource handle: %s, err: %+v", return fmt.Errorf("NodePrepareResource failed, claim UID: %s, claim name: %s, resource handle: %s, err: %+v",
resourceClaim.UID, resourceClaim.Name, resourceHandle.Data, err) resourceClaim.UID, resourceClaim.Name, resourceHandle.Data, err)
} }
klog.V(3).InfoS("NodePrepareResource succeeded", "pluginName", pluginName, "response", response) klog.V(3).InfoS("NodePrepareResource succeeded", "pluginName", pluginName, "response", response)
// Add the CDI Devices returned by NodePrepareResource to // Add the CDI Devices returned by NodePrepareResource to
// the claimInfo object. // the claimInfo object.
err = claimInfo.addCDIDevices(pluginName, response.CdiDevices) err = claimInfo.addCDIDevices(pluginName, response.CdiDevices)
if err != nil { if err != nil {
return fmt.Errorf("failed to add CDIDevices to claimInfo %+v: %+v", claimInfo, err) return fmt.Errorf("failed to add CDIDevices to claimInfo %+v: %+v", claimInfo, err)
} }
// TODO: We (re)add the claimInfo object to the cache and // TODO: We (re)add the claimInfo object to the cache and
// sync it to the checkpoint *after* the // sync it to the checkpoint *after* the
// NodePrepareResource call has completed. This will cause // NodePrepareResource call has completed. This will cause
// issues if the kubelet gets restarted between // issues if the kubelet gets restarted between
// NodePrepareResource and syncToCheckpoint. It will result // NodePrepareResource and syncToCheckpoint. It will result
// in not calling NodeUnprepareResource for this claim // in not calling NodeUnprepareResource for this claim
// because no claimInfo will be synced back to the cache // because no claimInfo will be synced back to the cache
// for it after the restart. We need to resolve this issue // for it after the restart. We need to resolve this issue
// before moving to beta. // before moving to beta.
m.cache.add(claimInfo) m.cache.add(claimInfo)
// Checkpoint to reduce redundant calls to // Checkpoint to reduce redundant calls to
// NodePrepareResource() after a kubelet restart. // NodePrepareResource() after a kubelet restart.
err = m.cache.syncToCheckpoint() err = m.cache.syncToCheckpoint()
if err != nil { if err != nil {
return fmt.Errorf("failed to checkpoint claimInfo state, err: %+v", err) return fmt.Errorf("failed to checkpoint claimInfo state, err: %+v", err)
}
}
} }
} }
} }