DRA: get rid of unneeded loops over pod containers

This commit is contained in:
Ed Bartosh 2023-03-15 09:41:30 +02:00
parent 37937bb227
commit 1aeec10efb

View File

@ -66,112 +66,107 @@ func NewManagerImpl(kubeClient clientset.Interface, stateFileDirectory string) (
// for each new resource requirement, process their responses and update the cached
// containerResources on success.
func (m *ManagerImpl) PrepareResources(pod *v1.Pod) error {
// Process resources for each resource claim referenced by container
for _, container := range append(pod.Spec.InitContainers, pod.Spec.Containers...) {
for range container.Resources.Claims {
for i := range pod.Spec.ResourceClaims {
claimName := resourceclaim.Name(pod, &pod.Spec.ResourceClaims[i])
klog.V(3).InfoS("Processing resource", "claim", claimName, "pod", pod.Name)
for i := range pod.Spec.ResourceClaims {
claimName := resourceclaim.Name(pod, &pod.Spec.ResourceClaims[i])
klog.V(3).InfoS("Processing resource", "claim", claimName, "pod", pod.Name)
// Resource is already prepared, add pod UID to it
if claimInfo := m.cache.get(claimName, pod.Namespace); claimInfo != nil {
// We delay checkpointing of this change until this call
// returns successfully. It is OK to do this because we
// will only return successfully from this call if the
// checkpoint has succeeded. That means if the kubelet is
// ever restarted before this checkpoint succeeds, the pod
// whose resources are being prepared would never have
// started, so it's OK (actually correct) to not include it
// in the cache.
claimInfo.addPodReference(pod.UID)
continue
}
// Resource is already prepared, add pod UID to it
if claimInfo := m.cache.get(claimName, pod.Namespace); claimInfo != nil {
// We delay checkpointing of this change until this call
// returns successfully. It is OK to do this because we
// will only return successfully from this call if the
// checkpoint has succeeded. That means if the kubelet is
// ever restarted before this checkpoint succeeds, the pod
// whose resources are being prepared would never have
// started, so it's OK (actually correct) to not include it
// in the cache.
claimInfo.addPodReference(pod.UID)
continue
}
// Query claim object from the API server
resourceClaim, err := m.kubeClient.ResourceV1alpha2().ResourceClaims(pod.Namespace).Get(
context.TODO(),
claimName,
metav1.GetOptions{})
if err != nil {
return fmt.Errorf("failed to fetch ResourceClaim %s referenced by pod %s: %+v", claimName, pod.Name, err)
}
// Query claim object from the API server
resourceClaim, err := m.kubeClient.ResourceV1alpha2().ResourceClaims(pod.Namespace).Get(
context.TODO(),
claimName,
metav1.GetOptions{})
if err != nil {
return fmt.Errorf("failed to fetch ResourceClaim %s referenced by pod %s: %+v", claimName, pod.Name, err)
}
// Check if pod is in the ReservedFor for the claim
if !resourceclaim.IsReservedForPod(pod, resourceClaim) {
return fmt.Errorf("pod %s(%s) is not allowed to use resource claim %s(%s)",
pod.Name, pod.UID, claimName, resourceClaim.UID)
}
// Check if pod is in the ReservedFor for the claim
if !resourceclaim.IsReservedForPod(pod, resourceClaim) {
return fmt.Errorf("pod %s(%s) is not allowed to use resource claim %s(%s)",
pod.Name, pod.UID, claimName, resourceClaim.UID)
}
// Grab the allocation.resourceHandles. If there are no
// allocation.resourceHandles, create a single resourceHandle with no
// content. This will trigger processing of this claim by a single
// kubelet plugin whose name matches resourceClaim.Status.DriverName.
resourceHandles := resourceClaim.Status.Allocation.ResourceHandles
if len(resourceHandles) == 0 {
resourceHandles = make([]resourcev1alpha2.ResourceHandle, 1)
}
// Grab the allocation.resourceHandles. If there are no
// allocation.resourceHandles, create a single resourceHandle with no
// content. This will trigger processing of this claim by a single
// kubelet plugin whose name matches resourceClaim.Status.DriverName.
resourceHandles := resourceClaim.Status.Allocation.ResourceHandles
if len(resourceHandles) == 0 {
resourceHandles = make([]resourcev1alpha2.ResourceHandle, 1)
}
// Create a claimInfo object to store the relevant claim info.
claimInfo := newClaimInfo(
resourceClaim.Status.DriverName,
resourceClaim.Spec.ResourceClassName,
resourceClaim.UID,
resourceClaim.Name,
resourceClaim.Namespace,
sets.New(string(pod.UID)),
)
// Create a claimInfo object to store the relevant claim info.
claimInfo := newClaimInfo(
resourceClaim.Status.DriverName,
resourceClaim.Spec.ResourceClassName,
resourceClaim.UID,
resourceClaim.Name,
resourceClaim.Namespace,
sets.New(string(pod.UID)),
)
// Walk through each resourceHandle
for _, resourceHandle := range resourceHandles {
// If no DriverName is provided in the resourceHandle, we
// use the DriverName from the status
pluginName := resourceHandle.DriverName
if pluginName == "" {
pluginName = resourceClaim.Status.DriverName
}
// Walk through each resourceHandle
for _, resourceHandle := range resourceHandles {
// If no DriverName is provided in the resourceHandle, we
// use the DriverName from the status
pluginName := resourceHandle.DriverName
if pluginName == "" {
pluginName = resourceClaim.Status.DriverName
}
// Call NodePrepareResource RPC for each resourceHandle
client, err := dra.NewDRAPluginClient(pluginName)
if err != nil {
return fmt.Errorf("failed to get DRA Plugin client for plugin name %s, err=%+v", pluginName, err)
}
response, err := client.NodePrepareResource(
context.Background(),
resourceClaim.Namespace,
resourceClaim.UID,
resourceClaim.Name,
resourceHandle.Data)
if err != nil {
return fmt.Errorf("NodePrepareResource failed, claim UID: %s, claim name: %s, resource handle: %s, err: %+v",
resourceClaim.UID, resourceClaim.Name, resourceHandle.Data, err)
}
klog.V(3).InfoS("NodePrepareResource succeeded", "pluginName", pluginName, "response", response)
// Call NodePrepareResource RPC for each resourceHandle
client, err := dra.NewDRAPluginClient(pluginName)
if err != nil {
return fmt.Errorf("failed to get DRA Plugin client for plugin name %s, err=%+v", pluginName, err)
}
response, err := client.NodePrepareResource(
context.Background(),
resourceClaim.Namespace,
resourceClaim.UID,
resourceClaim.Name,
resourceHandle.Data)
if err != nil {
return fmt.Errorf("NodePrepareResource failed, claim UID: %s, claim name: %s, resource handle: %s, err: %+v",
resourceClaim.UID, resourceClaim.Name, resourceHandle.Data, err)
}
klog.V(3).InfoS("NodePrepareResource succeeded", "pluginName", pluginName, "response", response)
// Add the CDI Devices returned by NodePrepareResource to
// the claimInfo object.
err = claimInfo.addCDIDevices(pluginName, response.CdiDevices)
if err != nil {
return fmt.Errorf("failed to add CDIDevices to claimInfo %+v: %+v", claimInfo, err)
}
// Add the CDI Devices returned by NodePrepareResource to
// the claimInfo object.
err = claimInfo.addCDIDevices(pluginName, response.CdiDevices)
if err != nil {
return fmt.Errorf("failed to add CDIDevices to claimInfo %+v: %+v", claimInfo, err)
}
// TODO: We (re)add the claimInfo object to the cache and
// sync it to the checkpoint *after* the
// NodePrepareResource call has completed. This will cause
// issues if the kubelet gets restarted between
// NodePrepareResource and syncToCheckpoint. It will result
// in not calling NodeUnprepareResource for this claim
// because no claimInfo will be synced back to the cache
// for it after the restart. We need to resolve this issue
// before moving to beta.
m.cache.add(claimInfo)
// TODO: We (re)add the claimInfo object to the cache and
// sync it to the checkpoint *after* the
// NodePrepareResource call has completed. This will cause
// issues if the kubelet gets restarted between
// NodePrepareResource and syncToCheckpoint. It will result
// in not calling NodeUnprepareResource for this claim
// because no claimInfo will be synced back to the cache
// for it after the restart. We need to resolve this issue
// before moving to beta.
m.cache.add(claimInfo)
// Checkpoint to reduce redundant calls to
// NodePrepareResource() after a kubelet restart.
err = m.cache.syncToCheckpoint()
if err != nil {
return fmt.Errorf("failed to checkpoint claimInfo state, err: %+v", err)
}
}
// Checkpoint to reduce redundant calls to
// NodePrepareResource() after a kubelet restart.
err = m.cache.syncToCheckpoint()
if err != nil {
return fmt.Errorf("failed to checkpoint claimInfo state, err: %+v", err)
}
}
}