From 1aeec10efb758c985c4fc5c45d4e36ae90c8bcfb Mon Sep 17 00:00:00 2001 From: Ed Bartosh Date: Wed, 15 Mar 2023 09:41:30 +0200 Subject: [PATCH] DRA: get rid of unneeded loops over pod containers --- pkg/kubelet/cm/dra/manager.go | 187 +++++++++++++++++----------------- 1 file changed, 91 insertions(+), 96 deletions(-) diff --git a/pkg/kubelet/cm/dra/manager.go b/pkg/kubelet/cm/dra/manager.go index ea171f0b7b6..4eca6110311 100644 --- a/pkg/kubelet/cm/dra/manager.go +++ b/pkg/kubelet/cm/dra/manager.go @@ -66,112 +66,107 @@ func NewManagerImpl(kubeClient clientset.Interface, stateFileDirectory string) ( // for each new resource requirement, process their responses and update the cached // containerResources on success. func (m *ManagerImpl) PrepareResources(pod *v1.Pod) error { - // Process resources for each resource claim referenced by container - for _, container := range append(pod.Spec.InitContainers, pod.Spec.Containers...) { - for range container.Resources.Claims { - for i := range pod.Spec.ResourceClaims { - claimName := resourceclaim.Name(pod, &pod.Spec.ResourceClaims[i]) - klog.V(3).InfoS("Processing resource", "claim", claimName, "pod", pod.Name) + for i := range pod.Spec.ResourceClaims { + claimName := resourceclaim.Name(pod, &pod.Spec.ResourceClaims[i]) + klog.V(3).InfoS("Processing resource", "claim", claimName, "pod", pod.Name) - // Resource is already prepared, add pod UID to it - if claimInfo := m.cache.get(claimName, pod.Namespace); claimInfo != nil { - // We delay checkpointing of this change until this call - // returns successfully. It is OK to do this because we - // will only return successfully from this call if the - // checkpoint has succeeded. That means if the kubelet is - // ever restarted before this checkpoint succeeds, the pod - // whose resources are being prepared would never have - // started, so it's OK (actually correct) to not include it - // in the cache. - claimInfo.addPodReference(pod.UID) - continue - } + // Resource is already prepared, add pod UID to it + if claimInfo := m.cache.get(claimName, pod.Namespace); claimInfo != nil { + // We delay checkpointing of this change until this call + // returns successfully. It is OK to do this because we + // will only return successfully from this call if the + // checkpoint has succeeded. That means if the kubelet is + // ever restarted before this checkpoint succeeds, the pod + // whose resources are being prepared would never have + // started, so it's OK (actually correct) to not include it + // in the cache. + claimInfo.addPodReference(pod.UID) + continue + } - // Query claim object from the API server - resourceClaim, err := m.kubeClient.ResourceV1alpha2().ResourceClaims(pod.Namespace).Get( - context.TODO(), - claimName, - metav1.GetOptions{}) - if err != nil { - return fmt.Errorf("failed to fetch ResourceClaim %s referenced by pod %s: %+v", claimName, pod.Name, err) - } + // Query claim object from the API server + resourceClaim, err := m.kubeClient.ResourceV1alpha2().ResourceClaims(pod.Namespace).Get( + context.TODO(), + claimName, + metav1.GetOptions{}) + if err != nil { + return fmt.Errorf("failed to fetch ResourceClaim %s referenced by pod %s: %+v", claimName, pod.Name, err) + } - // Check if pod is in the ReservedFor for the claim - if !resourceclaim.IsReservedForPod(pod, resourceClaim) { - return fmt.Errorf("pod %s(%s) is not allowed to use resource claim %s(%s)", - pod.Name, pod.UID, claimName, resourceClaim.UID) - } + // Check if pod is in the ReservedFor for the claim + if !resourceclaim.IsReservedForPod(pod, resourceClaim) { + return fmt.Errorf("pod %s(%s) is not allowed to use resource claim %s(%s)", + pod.Name, pod.UID, claimName, resourceClaim.UID) + } - // Grab the allocation.resourceHandles. If there are no - // allocation.resourceHandles, create a single resourceHandle with no - // content. This will trigger processing of this claim by a single - // kubelet plugin whose name matches resourceClaim.Status.DriverName. - resourceHandles := resourceClaim.Status.Allocation.ResourceHandles - if len(resourceHandles) == 0 { - resourceHandles = make([]resourcev1alpha2.ResourceHandle, 1) - } + // Grab the allocation.resourceHandles. If there are no + // allocation.resourceHandles, create a single resourceHandle with no + // content. This will trigger processing of this claim by a single + // kubelet plugin whose name matches resourceClaim.Status.DriverName. + resourceHandles := resourceClaim.Status.Allocation.ResourceHandles + if len(resourceHandles) == 0 { + resourceHandles = make([]resourcev1alpha2.ResourceHandle, 1) + } - // Create a claimInfo object to store the relevant claim info. - claimInfo := newClaimInfo( - resourceClaim.Status.DriverName, - resourceClaim.Spec.ResourceClassName, - resourceClaim.UID, - resourceClaim.Name, - resourceClaim.Namespace, - sets.New(string(pod.UID)), - ) + // Create a claimInfo object to store the relevant claim info. + claimInfo := newClaimInfo( + resourceClaim.Status.DriverName, + resourceClaim.Spec.ResourceClassName, + resourceClaim.UID, + resourceClaim.Name, + resourceClaim.Namespace, + sets.New(string(pod.UID)), + ) - // Walk through each resourceHandle - for _, resourceHandle := range resourceHandles { - // If no DriverName is provided in the resourceHandle, we - // use the DriverName from the status - pluginName := resourceHandle.DriverName - if pluginName == "" { - pluginName = resourceClaim.Status.DriverName - } + // Walk through each resourceHandle + for _, resourceHandle := range resourceHandles { + // If no DriverName is provided in the resourceHandle, we + // use the DriverName from the status + pluginName := resourceHandle.DriverName + if pluginName == "" { + pluginName = resourceClaim.Status.DriverName + } - // Call NodePrepareResource RPC for each resourceHandle - client, err := dra.NewDRAPluginClient(pluginName) - if err != nil { - return fmt.Errorf("failed to get DRA Plugin client for plugin name %s, err=%+v", pluginName, err) - } - response, err := client.NodePrepareResource( - context.Background(), - resourceClaim.Namespace, - resourceClaim.UID, - resourceClaim.Name, - resourceHandle.Data) - if err != nil { - return fmt.Errorf("NodePrepareResource failed, claim UID: %s, claim name: %s, resource handle: %s, err: %+v", - resourceClaim.UID, resourceClaim.Name, resourceHandle.Data, err) - } - klog.V(3).InfoS("NodePrepareResource succeeded", "pluginName", pluginName, "response", response) + // Call NodePrepareResource RPC for each resourceHandle + client, err := dra.NewDRAPluginClient(pluginName) + if err != nil { + return fmt.Errorf("failed to get DRA Plugin client for plugin name %s, err=%+v", pluginName, err) + } + response, err := client.NodePrepareResource( + context.Background(), + resourceClaim.Namespace, + resourceClaim.UID, + resourceClaim.Name, + resourceHandle.Data) + if err != nil { + return fmt.Errorf("NodePrepareResource failed, claim UID: %s, claim name: %s, resource handle: %s, err: %+v", + resourceClaim.UID, resourceClaim.Name, resourceHandle.Data, err) + } + klog.V(3).InfoS("NodePrepareResource succeeded", "pluginName", pluginName, "response", response) - // Add the CDI Devices returned by NodePrepareResource to - // the claimInfo object. - err = claimInfo.addCDIDevices(pluginName, response.CdiDevices) - if err != nil { - return fmt.Errorf("failed to add CDIDevices to claimInfo %+v: %+v", claimInfo, err) - } + // Add the CDI Devices returned by NodePrepareResource to + // the claimInfo object. + err = claimInfo.addCDIDevices(pluginName, response.CdiDevices) + if err != nil { + return fmt.Errorf("failed to add CDIDevices to claimInfo %+v: %+v", claimInfo, err) + } - // TODO: We (re)add the claimInfo object to the cache and - // sync it to the checkpoint *after* the - // NodePrepareResource call has completed. This will cause - // issues if the kubelet gets restarted between - // NodePrepareResource and syncToCheckpoint. It will result - // in not calling NodeUnprepareResource for this claim - // because no claimInfo will be synced back to the cache - // for it after the restart. We need to resolve this issue - // before moving to beta. - m.cache.add(claimInfo) + // TODO: We (re)add the claimInfo object to the cache and + // sync it to the checkpoint *after* the + // NodePrepareResource call has completed. This will cause + // issues if the kubelet gets restarted between + // NodePrepareResource and syncToCheckpoint. It will result + // in not calling NodeUnprepareResource for this claim + // because no claimInfo will be synced back to the cache + // for it after the restart. We need to resolve this issue + // before moving to beta. + m.cache.add(claimInfo) - // Checkpoint to reduce redundant calls to - // NodePrepareResource() after a kubelet restart. - err = m.cache.syncToCheckpoint() - if err != nil { - return fmt.Errorf("failed to checkpoint claimInfo state, err: %+v", err) - } - } + // Checkpoint to reduce redundant calls to + // NodePrepareResource() after a kubelet restart. + err = m.cache.syncToCheckpoint() + if err != nil { + return fmt.Errorf("failed to checkpoint claimInfo state, err: %+v", err) } } }