diff --git a/pkg/kubelet/cm/container_manager.go b/pkg/kubelet/cm/container_manager.go index cb3a265073a..aae7d3231cf 100644 --- a/pkg/kubelet/cm/container_manager.go +++ b/pkg/kubelet/cm/container_manager.go @@ -33,7 +33,6 @@ import ( "k8s.io/kubernetes/pkg/kubelet/apis/podresources" "k8s.io/kubernetes/pkg/kubelet/cm/cpuset" "k8s.io/kubernetes/pkg/kubelet/cm/devicemanager" - "k8s.io/kubernetes/pkg/kubelet/cm/dra" "k8s.io/kubernetes/pkg/kubelet/config" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" evictionapi "k8s.io/kubernetes/pkg/kubelet/eviction/api" @@ -118,11 +117,11 @@ type ContainerManager interface { // GetNodeAllocatableAbsolute returns the absolute value of Node Allocatable which is primarily useful for enforcement. GetNodeAllocatableAbsolute() v1.ResourceList - // PrepareResource prepares pod resources - PrepareResources(pod *v1.Pod, container *v1.Container) (*dra.ContainerInfo, error) + // PrepareDynamicResource prepares dynamic pod resources + PrepareDynamicResources(*v1.Pod) error - // UnrepareResources unprepares pod resources - UnprepareResources(*v1.Pod) error + // UnrepareDynamicResources unprepares dynamic pod resources + UnprepareDynamicResources(*v1.Pod) error // PodMightNeedToUnprepareResources returns true if the pod with the given UID // might need to unprepare resources. diff --git a/pkg/kubelet/cm/container_manager_linux.go b/pkg/kubelet/cm/container_manager_linux.go index 96b84aac717..ff3b197aeb0 100644 --- a/pkg/kubelet/cm/container_manager_linux.go +++ b/pkg/kubelet/cm/container_manager_linux.go @@ -656,8 +656,8 @@ func (cm *containerManagerImpl) GetPluginRegistrationHandler() cache.PluginHandl // TODO: move the GetResources logic to PodContainerManager. func (cm *containerManagerImpl) GetResources(pod *v1.Pod, container *v1.Container) (*kubecontainer.RunContainerOptions, error) { opts := &kubecontainer.RunContainerOptions{} - if cm.draManager != nil { - resOpts, err := cm.PrepareResources(pod, container) + if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.DynamicResourceAllocation) { + resOpts, err := cm.draManager.GetResources(pod, container) if err != nil { return nil, err } @@ -1040,18 +1040,14 @@ func containerMemoryFromBlock(blocks []memorymanagerstate.Block) []*podresources return containerMemories } -func (cm *containerManagerImpl) PrepareResources(pod *v1.Pod, container *v1.Container) (*dra.ContainerInfo, error) { - return cm.draManager.PrepareResources(pod, container) +func (cm *containerManagerImpl) PrepareDynamicResources(pod *v1.Pod) error { + return cm.draManager.PrepareResources(pod) } -func (cm *containerManagerImpl) UnprepareResources(pod *v1.Pod) error { +func (cm *containerManagerImpl) UnprepareDynamicResources(pod *v1.Pod) error { return cm.draManager.UnprepareResources(pod) } func (cm *containerManagerImpl) PodMightNeedToUnprepareResources(UID types.UID) bool { - if cm.draManager != nil { - return cm.draManager.PodMightNeedToUnprepareResources(UID) - } - - return false + return cm.draManager.PodMightNeedToUnprepareResources(UID) } diff --git a/pkg/kubelet/cm/container_manager_stub.go b/pkg/kubelet/cm/container_manager_stub.go index 4b2c1f87e8f..1a79c3b0f96 100644 --- a/pkg/kubelet/cm/container_manager_stub.go +++ b/pkg/kubelet/cm/container_manager_stub.go @@ -25,7 +25,6 @@ import ( internalapi "k8s.io/cri-api/pkg/apis" podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1" "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager" - "k8s.io/kubernetes/pkg/kubelet/cm/dra" "k8s.io/kubernetes/pkg/kubelet/cm/memorymanager" "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager" "k8s.io/kubernetes/pkg/kubelet/config" @@ -156,11 +155,11 @@ func (cm *containerManagerStub) GetNodeAllocatableAbsolute() v1.ResourceList { return nil } -func (cm *containerManagerStub) PrepareResources(pod *v1.Pod, container *v1.Container) (*dra.ContainerInfo, error) { - return nil, nil +func (cm *containerManagerStub) PrepareDynamicResources(pod *v1.Pod) error { + return nil } -func (cm *containerManagerStub) UnprepareResources(*v1.Pod) error { +func (cm *containerManagerStub) UnprepareDynamicResources(*v1.Pod) error { return nil } diff --git a/pkg/kubelet/cm/container_manager_windows.go b/pkg/kubelet/cm/container_manager_windows.go index c34c8432527..a192341bf0b 100644 --- a/pkg/kubelet/cm/container_manager_windows.go +++ b/pkg/kubelet/cm/container_manager_windows.go @@ -39,7 +39,6 @@ import ( "k8s.io/kubernetes/pkg/kubelet/cm/admission" "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager" "k8s.io/kubernetes/pkg/kubelet/cm/devicemanager" - "k8s.io/kubernetes/pkg/kubelet/cm/dra" "k8s.io/kubernetes/pkg/kubelet/cm/memorymanager" "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager" "k8s.io/kubernetes/pkg/kubelet/config" @@ -254,11 +253,11 @@ func (cm *containerManagerImpl) GetNodeAllocatableAbsolute() v1.ResourceList { return nil } -func (cm *containerManagerImpl) PrepareResources(pod *v1.Pod, container *v1.Container) (*dra.ContainerInfo, error) { - return nil, nil +func (cm *containerManagerImpl) PrepareDynamicResources(pod *v1.Pod) error { + return nil } -func (cm *containerManagerImpl) UnprepareResources(*v1.Pod) error { +func (cm *containerManagerImpl) UnprepareDynamicResources(*v1.Pod) error { return nil } diff --git a/pkg/kubelet/cm/dra/manager.go b/pkg/kubelet/cm/dra/manager.go index 6e702023516..95eb9687c05 100644 --- a/pkg/kubelet/cm/dra/manager.go +++ b/pkg/kubelet/cm/dra/manager.go @@ -71,85 +71,87 @@ func generateCDIAnnotations( return kubeAnnotations, nil } -// prepareContainerResources attempts to prepare all of required resource +// PrepareResources attempts to prepare all of the required resource // plugin resources for the input container, issue an NodePrepareResource rpc request // for each new resource requirement, process their responses and update the cached // containerResources on success. -func (m *ManagerImpl) prepareContainerResources(pod *v1.Pod, container *v1.Container) error { +func (m *ManagerImpl) PrepareResources(pod *v1.Pod) error { // Process resources for each resource claim referenced by container - for range container.Resources.Claims { - for i, podResourceClaim := range pod.Spec.ResourceClaims { - claimName := resourceclaim.Name(pod, &pod.Spec.ResourceClaims[i]) - klog.V(3).InfoS("Processing resource", "claim", claimName, "pod", pod.Name) + for _, container := range append(pod.Spec.InitContainers, pod.Spec.Containers...) { + for range container.Resources.Claims { + for i, podResourceClaim := range pod.Spec.ResourceClaims { + claimName := resourceclaim.Name(pod, &pod.Spec.ResourceClaims[i]) + klog.V(3).InfoS("Processing resource", "claim", claimName, "pod", pod.Name) - if claimInfo := m.cache.get(claimName, pod.Namespace); claimInfo != nil { - // resource is already prepared, add pod UID to it - claimInfo.addPodReference(pod.UID) + if claimInfo := m.cache.get(claimName, pod.Namespace); claimInfo != nil { + // resource is already prepared, add pod UID to it + claimInfo.addPodReference(pod.UID) - continue - } + continue + } - // Query claim object from the API server - resourceClaim, err := m.kubeClient.ResourceV1alpha1().ResourceClaims(pod.Namespace).Get( - context.TODO(), - claimName, - metav1.GetOptions{}) - if err != nil { - return fmt.Errorf("failed to fetch ResourceClaim %s referenced by pod %s: %+v", claimName, pod.Name, err) - } + // Query claim object from the API server + resourceClaim, err := m.kubeClient.ResourceV1alpha1().ResourceClaims(pod.Namespace).Get( + context.TODO(), + claimName, + metav1.GetOptions{}) + if err != nil { + return fmt.Errorf("failed to fetch ResourceClaim %s referenced by pod %s: %+v", claimName, pod.Name, err) + } - // Check if pod is in the ReservedFor for the claim - if !resourceclaim.IsReservedForPod(pod, resourceClaim) { - return fmt.Errorf("pod %s(%s) is not allowed to use resource claim %s(%s)", - pod.Name, pod.UID, podResourceClaim.Name, resourceClaim.UID) - } + // Check if pod is in the ReservedFor for the claim + if !resourceclaim.IsReservedForPod(pod, resourceClaim) { + return fmt.Errorf("pod %s(%s) is not allowed to use resource claim %s(%s)", + pod.Name, pod.UID, podResourceClaim.Name, resourceClaim.UID) + } - // Call NodePrepareResource RPC - driverName := resourceClaim.Status.DriverName + // Call NodePrepareResource RPC + driverName := resourceClaim.Status.DriverName - client, err := dra.NewDRAPluginClient(driverName) - if err != nil { - return fmt.Errorf("failed to get DRA Plugin client for plugin name %s, err=%+v", driverName, err) - } + client, err := dra.NewDRAPluginClient(driverName) + if err != nil { + return fmt.Errorf("failed to get DRA Plugin client for plugin name %s, err=%+v", driverName, err) + } - response, err := client.NodePrepareResource( - context.Background(), - resourceClaim.Namespace, - resourceClaim.UID, - resourceClaim.Name, - resourceClaim.Status.Allocation.ResourceHandle) - if err != nil { - return fmt.Errorf("NodePrepareResource failed, claim UID: %s, claim name: %s, resource handle: %s, err: %+v", - resourceClaim.UID, resourceClaim.Name, resourceClaim.Status.Allocation.ResourceHandle, err) - } - - klog.V(3).InfoS("NodePrepareResource succeeded", "response", response) - - annotations, err := generateCDIAnnotations(resourceClaim.UID, driverName, response.CdiDevices) - if err != nil { - return fmt.Errorf("failed to generate container annotations, err: %+v", err) - } - - // Cache prepared resource - err = m.cache.add( - resourceClaim.Name, - resourceClaim.Namespace, - &claimInfo{ - driverName: driverName, - claimUID: resourceClaim.UID, - claimName: resourceClaim.Name, - namespace: resourceClaim.Namespace, - podUIDs: sets.New(string(pod.UID)), - cdiDevices: response.CdiDevices, - annotations: annotations, - }) - if err != nil { - return fmt.Errorf( - "failed to cache prepared resource, claim: %s(%s), err: %+v", - resourceClaim.Name, + response, err := client.NodePrepareResource( + context.Background(), + resourceClaim.Namespace, resourceClaim.UID, - err, - ) + resourceClaim.Name, + resourceClaim.Status.Allocation.ResourceHandle) + if err != nil { + return fmt.Errorf("NodePrepareResource failed, claim UID: %s, claim name: %s, resource handle: %s, err: %+v", + resourceClaim.UID, resourceClaim.Name, resourceClaim.Status.Allocation.ResourceHandle, err) + } + + klog.V(3).InfoS("NodePrepareResource succeeded", "response", response) + + annotations, err := generateCDIAnnotations(resourceClaim.UID, driverName, response.CdiDevices) + if err != nil { + return fmt.Errorf("failed to generate container annotations, err: %+v", err) + } + + // Cache prepared resource + err = m.cache.add( + resourceClaim.Name, + resourceClaim.Namespace, + &claimInfo{ + driverName: driverName, + claimUID: resourceClaim.UID, + claimName: resourceClaim.Name, + namespace: resourceClaim.Namespace, + podUIDs: sets.New(string(pod.UID)), + cdiDevices: response.CdiDevices, + annotations: annotations, + }) + if err != nil { + return fmt.Errorf( + "failed to cache prepared resource, claim: %s(%s), err: %+v", + resourceClaim.Name, + resourceClaim.UID, + err, + ) + } } } } @@ -157,9 +159,9 @@ func (m *ManagerImpl) prepareContainerResources(pod *v1.Pod, container *v1.Conta return nil } -// getContainerInfo gets a container info from the claimInfo cache. +// GetResources gets a ContainerInfo object from the claimInfo cache. // This information is used by the caller to update a container config. -func (m *ManagerImpl) getContainerInfo(pod *v1.Pod, container *v1.Container) (*ContainerInfo, error) { +func (m *ManagerImpl) GetResources(pod *v1.Pod, container *v1.Container) (*ContainerInfo, error) { annotations := []kubecontainer.Annotation{} for i, podResourceClaim := range pod.Spec.ResourceClaims { @@ -183,15 +185,6 @@ func (m *ManagerImpl) getContainerInfo(pod *v1.Pod, container *v1.Container) (*C return &ContainerInfo{Annotations: annotations}, nil } -// PrepareResources calls plugin NodePrepareResource from the registered DRA resource plugins. -func (m *ManagerImpl) PrepareResources(pod *v1.Pod, container *v1.Container) (*ContainerInfo, error) { - if err := m.prepareContainerResources(pod, container); err != nil { - return nil, err - } - - return m.getContainerInfo(pod, container) -} - // UnprepareResources calls a plugin's NodeUnprepareResource API for each resource claim owned by a pod. // This function is idempotent and may be called multiple times against the same pod. // As such, calls to the underlying NodeUnprepareResource API are skipped for claims that have diff --git a/pkg/kubelet/cm/dra/types.go b/pkg/kubelet/cm/dra/types.go index 894b2f30507..6b52ad8ef5c 100644 --- a/pkg/kubelet/cm/dra/types.go +++ b/pkg/kubelet/cm/dra/types.go @@ -24,14 +24,17 @@ import ( // Manager manages all the DRA resource plugins running on a node. type Manager interface { - // PrepareResources prepares resources for a container in a pod. - // It communicates with the DRA resource plugin to prepare resources and - // returns resource info to trigger CDI injection by the runtime. - PrepareResources(pod *v1.Pod, container *v1.Container) (*ContainerInfo, error) + // PrepareResources prepares resources for a pod. + // It communicates with the DRA resource plugin to prepare resources. + PrepareResources(pod *v1.Pod) error // UnprepareResources calls NodeUnprepareResource GRPC from DRA plugin to unprepare pod resources UnprepareResources(pod *v1.Pod) error + // GetResources gets a ContainerInfo object from the claimInfo cache. + // This information is used by the caller to update a container config. + GetResources(pod *v1.Pod, container *v1.Container) (*ContainerInfo, error) + // PodMightNeedToUnprepareResources returns true if the pod with the given UID // might need to unprepare resources. PodMightNeedToUnprepareResources(UID types.UID) bool diff --git a/pkg/kubelet/cm/fake_container_manager.go b/pkg/kubelet/cm/fake_container_manager.go index 3c3be7bed50..306a30aa9ed 100644 --- a/pkg/kubelet/cm/fake_container_manager.go +++ b/pkg/kubelet/cm/fake_container_manager.go @@ -26,7 +26,6 @@ import ( internalapi "k8s.io/cri-api/pkg/apis" podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1" "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager" - "k8s.io/kubernetes/pkg/kubelet/cm/dra" "k8s.io/kubernetes/pkg/kubelet/cm/memorymanager" "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager" "k8s.io/kubernetes/pkg/kubelet/config" @@ -239,11 +238,11 @@ func (cm *FakeContainerManager) GetNodeAllocatableAbsolute() v1.ResourceList { return nil } -func (cm *FakeContainerManager) PrepareResources(pod *v1.Pod, container *v1.Container) (*dra.ContainerInfo, error) { - return nil, nil +func (cm *FakeContainerManager) PrepareDynamicResources(pod *v1.Pod) error { + return nil } -func (cm *FakeContainerManager) UnprepareResources(*v1.Pod) error { +func (cm *FakeContainerManager) UnprepareDynamicResources(*v1.Pod) error { return nil } diff --git a/pkg/kubelet/container/helpers.go b/pkg/kubelet/container/helpers.go index 0de9d034e57..42a67211a74 100644 --- a/pkg/kubelet/container/helpers.go +++ b/pkg/kubelet/container/helpers.go @@ -60,6 +60,12 @@ type RuntimeHelper interface { // GetOrCreateUserNamespaceMappings returns the configuration for the sandbox user namespace GetOrCreateUserNamespaceMappings(pod *v1.Pod) (*runtimeapi.UserNamespace, error) + + // PrepareDynamicResources prepares resources for a pod. + PrepareDynamicResources(pod *v1.Pod) error + + // UnprepareDynamicResources unprepares resources for a a pod. + UnprepareDynamicResources(pod *v1.Pod) error } // ShouldContainerBeRestarted checks whether a container needs to be restarted. diff --git a/pkg/kubelet/container/testing/fake_runtime_helper.go b/pkg/kubelet/container/testing/fake_runtime_helper.go index 02e06d174e8..36b6f6c3fd3 100644 --- a/pkg/kubelet/container/testing/fake_runtime_helper.go +++ b/pkg/kubelet/container/testing/fake_runtime_helper.go @@ -71,3 +71,11 @@ func (f *FakeRuntimeHelper) GetExtraSupplementalGroupsForPod(pod *v1.Pod) []int6 func (f *FakeRuntimeHelper) GetOrCreateUserNamespaceMappings(pod *v1.Pod) (*runtimeapi.UserNamespace, error) { return nil, nil } + +func (f *FakeRuntimeHelper) PrepareDynamicResources(pod *v1.Pod) error { + return nil +} + +func (f *FakeRuntimeHelper) UnprepareDynamicResources(pod *v1.Pod) error { + return nil +} diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index d8eda75da1d..fd1d8a18c82 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -1943,7 +1943,7 @@ func (kl *Kubelet) syncTerminatingPod(_ context.Context, pod *v1.Pod, podStatus // and BEFORE the pod status is changed on the API server // to avoid race conditions with the resource deallocation code in kubernetes core. if utilfeature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation) { - if err := kl.containerManager.UnprepareResources(pod); err != nil { + if err := kl.UnprepareDynamicResources(pod); err != nil { return err } } @@ -2613,3 +2613,15 @@ func isSyncPodWorthy(event *pleg.PodLifecycleEvent) bool { // ContainerRemoved doesn't affect pod state return event.Type != pleg.ContainerRemoved } + +// PrepareDynamicResources calls the container Manager PrepareDynamicResources API +// This method implements the RuntimeHelper interface +func (kl *Kubelet) PrepareDynamicResources(pod *v1.Pod) error { + return kl.containerManager.PrepareDynamicResources(pod) +} + +// UnprepareDynamicResources calls the container Manager UnprepareDynamicResources API +// This method implements the RuntimeHelper interface +func (kl *Kubelet) UnprepareDynamicResources(pod *v1.Pod) error { + return kl.containerManager.UnprepareDynamicResources(pod) +} diff --git a/pkg/kubelet/kuberuntime/kuberuntime_manager.go b/pkg/kubelet/kuberuntime/kuberuntime_manager.go index db0f1b21ee1..56958b3605d 100644 --- a/pkg/kubelet/kuberuntime/kuberuntime_manager.go +++ b/pkg/kubelet/kuberuntime/kuberuntime_manager.go @@ -764,6 +764,13 @@ func (m *kubeGenericRuntimeManager) SyncPod(ctx context.Context, pod *v1.Pod, po // When runc supports slash as sysctl separator, this function can no longer be used. sysctl.ConvertPodSysctlsVariableToDotsSeparator(pod.Spec.SecurityContext) + // Prepare resources allocated by the Dynammic Resource Allocation feature for the pod + if utilfeature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation) { + if m.runtimeHelper.PrepareDynamicResources(pod) != nil { + return + } + } + podSandboxID, msg, err = m.createPodSandbox(ctx, pod, podContainerChanges.Attempt) if err != nil { // createPodSandbox can return an error from CNI, CSI,