mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-07 11:13:48 +00:00
DRA: get rid of unneeded loops over pod containers
This commit is contained in:
parent
37937bb227
commit
1aeec10efb
@ -66,112 +66,107 @@ func NewManagerImpl(kubeClient clientset.Interface, stateFileDirectory string) (
|
||||
// for each new resource requirement, process their responses and update the cached
|
||||
// containerResources on success.
|
||||
func (m *ManagerImpl) PrepareResources(pod *v1.Pod) error {
|
||||
// Process resources for each resource claim referenced by container
|
||||
for _, container := range append(pod.Spec.InitContainers, pod.Spec.Containers...) {
|
||||
for range container.Resources.Claims {
|
||||
for i := range pod.Spec.ResourceClaims {
|
||||
claimName := resourceclaim.Name(pod, &pod.Spec.ResourceClaims[i])
|
||||
klog.V(3).InfoS("Processing resource", "claim", claimName, "pod", pod.Name)
|
||||
for i := range pod.Spec.ResourceClaims {
|
||||
claimName := resourceclaim.Name(pod, &pod.Spec.ResourceClaims[i])
|
||||
klog.V(3).InfoS("Processing resource", "claim", claimName, "pod", pod.Name)
|
||||
|
||||
// Resource is already prepared, add pod UID to it
|
||||
if claimInfo := m.cache.get(claimName, pod.Namespace); claimInfo != nil {
|
||||
// We delay checkpointing of this change until this call
|
||||
// returns successfully. It is OK to do this because we
|
||||
// will only return successfully from this call if the
|
||||
// checkpoint has succeeded. That means if the kubelet is
|
||||
// ever restarted before this checkpoint succeeds, the pod
|
||||
// whose resources are being prepared would never have
|
||||
// started, so it's OK (actually correct) to not include it
|
||||
// in the cache.
|
||||
claimInfo.addPodReference(pod.UID)
|
||||
continue
|
||||
}
|
||||
// Resource is already prepared, add pod UID to it
|
||||
if claimInfo := m.cache.get(claimName, pod.Namespace); claimInfo != nil {
|
||||
// We delay checkpointing of this change until this call
|
||||
// returns successfully. It is OK to do this because we
|
||||
// will only return successfully from this call if the
|
||||
// checkpoint has succeeded. That means if the kubelet is
|
||||
// ever restarted before this checkpoint succeeds, the pod
|
||||
// whose resources are being prepared would never have
|
||||
// started, so it's OK (actually correct) to not include it
|
||||
// in the cache.
|
||||
claimInfo.addPodReference(pod.UID)
|
||||
continue
|
||||
}
|
||||
|
||||
// Query claim object from the API server
|
||||
resourceClaim, err := m.kubeClient.ResourceV1alpha2().ResourceClaims(pod.Namespace).Get(
|
||||
context.TODO(),
|
||||
claimName,
|
||||
metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to fetch ResourceClaim %s referenced by pod %s: %+v", claimName, pod.Name, err)
|
||||
}
|
||||
// Query claim object from the API server
|
||||
resourceClaim, err := m.kubeClient.ResourceV1alpha2().ResourceClaims(pod.Namespace).Get(
|
||||
context.TODO(),
|
||||
claimName,
|
||||
metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to fetch ResourceClaim %s referenced by pod %s: %+v", claimName, pod.Name, err)
|
||||
}
|
||||
|
||||
// Check if pod is in the ReservedFor for the claim
|
||||
if !resourceclaim.IsReservedForPod(pod, resourceClaim) {
|
||||
return fmt.Errorf("pod %s(%s) is not allowed to use resource claim %s(%s)",
|
||||
pod.Name, pod.UID, claimName, resourceClaim.UID)
|
||||
}
|
||||
// Check if pod is in the ReservedFor for the claim
|
||||
if !resourceclaim.IsReservedForPod(pod, resourceClaim) {
|
||||
return fmt.Errorf("pod %s(%s) is not allowed to use resource claim %s(%s)",
|
||||
pod.Name, pod.UID, claimName, resourceClaim.UID)
|
||||
}
|
||||
|
||||
// Grab the allocation.resourceHandles. If there are no
|
||||
// allocation.resourceHandles, create a single resourceHandle with no
|
||||
// content. This will trigger processing of this claim by a single
|
||||
// kubelet plugin whose name matches resourceClaim.Status.DriverName.
|
||||
resourceHandles := resourceClaim.Status.Allocation.ResourceHandles
|
||||
if len(resourceHandles) == 0 {
|
||||
resourceHandles = make([]resourcev1alpha2.ResourceHandle, 1)
|
||||
}
|
||||
// Grab the allocation.resourceHandles. If there are no
|
||||
// allocation.resourceHandles, create a single resourceHandle with no
|
||||
// content. This will trigger processing of this claim by a single
|
||||
// kubelet plugin whose name matches resourceClaim.Status.DriverName.
|
||||
resourceHandles := resourceClaim.Status.Allocation.ResourceHandles
|
||||
if len(resourceHandles) == 0 {
|
||||
resourceHandles = make([]resourcev1alpha2.ResourceHandle, 1)
|
||||
}
|
||||
|
||||
// Create a claimInfo object to store the relevant claim info.
|
||||
claimInfo := newClaimInfo(
|
||||
resourceClaim.Status.DriverName,
|
||||
resourceClaim.Spec.ResourceClassName,
|
||||
resourceClaim.UID,
|
||||
resourceClaim.Name,
|
||||
resourceClaim.Namespace,
|
||||
sets.New(string(pod.UID)),
|
||||
)
|
||||
// Create a claimInfo object to store the relevant claim info.
|
||||
claimInfo := newClaimInfo(
|
||||
resourceClaim.Status.DriverName,
|
||||
resourceClaim.Spec.ResourceClassName,
|
||||
resourceClaim.UID,
|
||||
resourceClaim.Name,
|
||||
resourceClaim.Namespace,
|
||||
sets.New(string(pod.UID)),
|
||||
)
|
||||
|
||||
// Walk through each resourceHandle
|
||||
for _, resourceHandle := range resourceHandles {
|
||||
// If no DriverName is provided in the resourceHandle, we
|
||||
// use the DriverName from the status
|
||||
pluginName := resourceHandle.DriverName
|
||||
if pluginName == "" {
|
||||
pluginName = resourceClaim.Status.DriverName
|
||||
}
|
||||
// Walk through each resourceHandle
|
||||
for _, resourceHandle := range resourceHandles {
|
||||
// If no DriverName is provided in the resourceHandle, we
|
||||
// use the DriverName from the status
|
||||
pluginName := resourceHandle.DriverName
|
||||
if pluginName == "" {
|
||||
pluginName = resourceClaim.Status.DriverName
|
||||
}
|
||||
|
||||
// Call NodePrepareResource RPC for each resourceHandle
|
||||
client, err := dra.NewDRAPluginClient(pluginName)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get DRA Plugin client for plugin name %s, err=%+v", pluginName, err)
|
||||
}
|
||||
response, err := client.NodePrepareResource(
|
||||
context.Background(),
|
||||
resourceClaim.Namespace,
|
||||
resourceClaim.UID,
|
||||
resourceClaim.Name,
|
||||
resourceHandle.Data)
|
||||
if err != nil {
|
||||
return fmt.Errorf("NodePrepareResource failed, claim UID: %s, claim name: %s, resource handle: %s, err: %+v",
|
||||
resourceClaim.UID, resourceClaim.Name, resourceHandle.Data, err)
|
||||
}
|
||||
klog.V(3).InfoS("NodePrepareResource succeeded", "pluginName", pluginName, "response", response)
|
||||
// Call NodePrepareResource RPC for each resourceHandle
|
||||
client, err := dra.NewDRAPluginClient(pluginName)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get DRA Plugin client for plugin name %s, err=%+v", pluginName, err)
|
||||
}
|
||||
response, err := client.NodePrepareResource(
|
||||
context.Background(),
|
||||
resourceClaim.Namespace,
|
||||
resourceClaim.UID,
|
||||
resourceClaim.Name,
|
||||
resourceHandle.Data)
|
||||
if err != nil {
|
||||
return fmt.Errorf("NodePrepareResource failed, claim UID: %s, claim name: %s, resource handle: %s, err: %+v",
|
||||
resourceClaim.UID, resourceClaim.Name, resourceHandle.Data, err)
|
||||
}
|
||||
klog.V(3).InfoS("NodePrepareResource succeeded", "pluginName", pluginName, "response", response)
|
||||
|
||||
// Add the CDI Devices returned by NodePrepareResource to
|
||||
// the claimInfo object.
|
||||
err = claimInfo.addCDIDevices(pluginName, response.CdiDevices)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to add CDIDevices to claimInfo %+v: %+v", claimInfo, err)
|
||||
}
|
||||
// Add the CDI Devices returned by NodePrepareResource to
|
||||
// the claimInfo object.
|
||||
err = claimInfo.addCDIDevices(pluginName, response.CdiDevices)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to add CDIDevices to claimInfo %+v: %+v", claimInfo, err)
|
||||
}
|
||||
|
||||
// TODO: We (re)add the claimInfo object to the cache and
|
||||
// sync it to the checkpoint *after* the
|
||||
// NodePrepareResource call has completed. This will cause
|
||||
// issues if the kubelet gets restarted between
|
||||
// NodePrepareResource and syncToCheckpoint. It will result
|
||||
// in not calling NodeUnprepareResource for this claim
|
||||
// because no claimInfo will be synced back to the cache
|
||||
// for it after the restart. We need to resolve this issue
|
||||
// before moving to beta.
|
||||
m.cache.add(claimInfo)
|
||||
// TODO: We (re)add the claimInfo object to the cache and
|
||||
// sync it to the checkpoint *after* the
|
||||
// NodePrepareResource call has completed. This will cause
|
||||
// issues if the kubelet gets restarted between
|
||||
// NodePrepareResource and syncToCheckpoint. It will result
|
||||
// in not calling NodeUnprepareResource for this claim
|
||||
// because no claimInfo will be synced back to the cache
|
||||
// for it after the restart. We need to resolve this issue
|
||||
// before moving to beta.
|
||||
m.cache.add(claimInfo)
|
||||
|
||||
// Checkpoint to reduce redundant calls to
|
||||
// NodePrepareResource() after a kubelet restart.
|
||||
err = m.cache.syncToCheckpoint()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to checkpoint claimInfo state, err: %+v", err)
|
||||
}
|
||||
}
|
||||
// Checkpoint to reduce redundant calls to
|
||||
// NodePrepareResource() after a kubelet restart.
|
||||
err = m.cache.syncToCheckpoint()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to checkpoint claimInfo state, err: %+v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user