Merge pull request #114364 from bart0sh/PR102-prepare-DRA-resources-before-CNI-setup

kubelet: prepare DRA resources before CNI setup
This commit is contained in:
Kubernetes Prow Robot 2023-02-07 08:09:04 -08:00 committed by GitHub
commit 5437d493da
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 131 additions and 110 deletions

View File

@ -33,7 +33,6 @@ import (
"k8s.io/kubernetes/pkg/kubelet/apis/podresources"
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
"k8s.io/kubernetes/pkg/kubelet/cm/devicemanager"
"k8s.io/kubernetes/pkg/kubelet/cm/dra"
"k8s.io/kubernetes/pkg/kubelet/config"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
evictionapi "k8s.io/kubernetes/pkg/kubelet/eviction/api"
@ -118,11 +117,11 @@ type ContainerManager interface {
// GetNodeAllocatableAbsolute returns the absolute value of Node Allocatable which is primarily useful for enforcement.
GetNodeAllocatableAbsolute() v1.ResourceList
// PrepareResource prepares pod resources
PrepareResources(pod *v1.Pod, container *v1.Container) (*dra.ContainerInfo, error)
// PrepareDynamicResource prepares dynamic pod resources
PrepareDynamicResources(*v1.Pod) error
// UnrepareResources unprepares pod resources
UnprepareResources(*v1.Pod) error
// UnrepareDynamicResources unprepares dynamic pod resources
UnprepareDynamicResources(*v1.Pod) error
// PodMightNeedToUnprepareResources returns true if the pod with the given UID
// might need to unprepare resources.

View File

@ -656,8 +656,8 @@ func (cm *containerManagerImpl) GetPluginRegistrationHandler() cache.PluginHandl
// TODO: move the GetResources logic to PodContainerManager.
func (cm *containerManagerImpl) GetResources(pod *v1.Pod, container *v1.Container) (*kubecontainer.RunContainerOptions, error) {
opts := &kubecontainer.RunContainerOptions{}
if cm.draManager != nil {
resOpts, err := cm.PrepareResources(pod, container)
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.DynamicResourceAllocation) {
resOpts, err := cm.draManager.GetResources(pod, container)
if err != nil {
return nil, err
}
@ -1040,18 +1040,14 @@ func containerMemoryFromBlock(blocks []memorymanagerstate.Block) []*podresources
return containerMemories
}
func (cm *containerManagerImpl) PrepareResources(pod *v1.Pod, container *v1.Container) (*dra.ContainerInfo, error) {
return cm.draManager.PrepareResources(pod, container)
func (cm *containerManagerImpl) PrepareDynamicResources(pod *v1.Pod) error {
return cm.draManager.PrepareResources(pod)
}
func (cm *containerManagerImpl) UnprepareResources(pod *v1.Pod) error {
func (cm *containerManagerImpl) UnprepareDynamicResources(pod *v1.Pod) error {
return cm.draManager.UnprepareResources(pod)
}
func (cm *containerManagerImpl) PodMightNeedToUnprepareResources(UID types.UID) bool {
if cm.draManager != nil {
return cm.draManager.PodMightNeedToUnprepareResources(UID)
}
return false
return cm.draManager.PodMightNeedToUnprepareResources(UID)
}

View File

@ -25,7 +25,6 @@ import (
internalapi "k8s.io/cri-api/pkg/apis"
podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager"
"k8s.io/kubernetes/pkg/kubelet/cm/dra"
"k8s.io/kubernetes/pkg/kubelet/cm/memorymanager"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
"k8s.io/kubernetes/pkg/kubelet/config"
@ -156,11 +155,11 @@ func (cm *containerManagerStub) GetNodeAllocatableAbsolute() v1.ResourceList {
return nil
}
func (cm *containerManagerStub) PrepareResources(pod *v1.Pod, container *v1.Container) (*dra.ContainerInfo, error) {
return nil, nil
func (cm *containerManagerStub) PrepareDynamicResources(pod *v1.Pod) error {
return nil
}
func (cm *containerManagerStub) UnprepareResources(*v1.Pod) error {
func (cm *containerManagerStub) UnprepareDynamicResources(*v1.Pod) error {
return nil
}

View File

@ -39,7 +39,6 @@ import (
"k8s.io/kubernetes/pkg/kubelet/cm/admission"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager"
"k8s.io/kubernetes/pkg/kubelet/cm/devicemanager"
"k8s.io/kubernetes/pkg/kubelet/cm/dra"
"k8s.io/kubernetes/pkg/kubelet/cm/memorymanager"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
"k8s.io/kubernetes/pkg/kubelet/config"
@ -254,11 +253,11 @@ func (cm *containerManagerImpl) GetNodeAllocatableAbsolute() v1.ResourceList {
return nil
}
func (cm *containerManagerImpl) PrepareResources(pod *v1.Pod, container *v1.Container) (*dra.ContainerInfo, error) {
return nil, nil
func (cm *containerManagerImpl) PrepareDynamicResources(pod *v1.Pod) error {
return nil
}
func (cm *containerManagerImpl) UnprepareResources(*v1.Pod) error {
func (cm *containerManagerImpl) UnprepareDynamicResources(*v1.Pod) error {
return nil
}

View File

@ -71,85 +71,87 @@ func generateCDIAnnotations(
return kubeAnnotations, nil
}
// prepareContainerResources attempts to prepare all of required resource
// PrepareResources attempts to prepare all of the required resource
// plugin resources for the input container, issue an NodePrepareResource rpc request
// for each new resource requirement, process their responses and update the cached
// containerResources on success.
func (m *ManagerImpl) prepareContainerResources(pod *v1.Pod, container *v1.Container) error {
func (m *ManagerImpl) PrepareResources(pod *v1.Pod) error {
// Process resources for each resource claim referenced by container
for range container.Resources.Claims {
for i, podResourceClaim := range pod.Spec.ResourceClaims {
claimName := resourceclaim.Name(pod, &pod.Spec.ResourceClaims[i])
klog.V(3).InfoS("Processing resource", "claim", claimName, "pod", pod.Name)
for _, container := range append(pod.Spec.InitContainers, pod.Spec.Containers...) {
for range container.Resources.Claims {
for i, podResourceClaim := range pod.Spec.ResourceClaims {
claimName := resourceclaim.Name(pod, &pod.Spec.ResourceClaims[i])
klog.V(3).InfoS("Processing resource", "claim", claimName, "pod", pod.Name)
if claimInfo := m.cache.get(claimName, pod.Namespace); claimInfo != nil {
// resource is already prepared, add pod UID to it
claimInfo.addPodReference(pod.UID)
if claimInfo := m.cache.get(claimName, pod.Namespace); claimInfo != nil {
// resource is already prepared, add pod UID to it
claimInfo.addPodReference(pod.UID)
continue
}
continue
}
// Query claim object from the API server
resourceClaim, err := m.kubeClient.ResourceV1alpha1().ResourceClaims(pod.Namespace).Get(
context.TODO(),
claimName,
metav1.GetOptions{})
if err != nil {
return fmt.Errorf("failed to fetch ResourceClaim %s referenced by pod %s: %+v", claimName, pod.Name, err)
}
// Query claim object from the API server
resourceClaim, err := m.kubeClient.ResourceV1alpha1().ResourceClaims(pod.Namespace).Get(
context.TODO(),
claimName,
metav1.GetOptions{})
if err != nil {
return fmt.Errorf("failed to fetch ResourceClaim %s referenced by pod %s: %+v", claimName, pod.Name, err)
}
// Check if pod is in the ReservedFor for the claim
if !resourceclaim.IsReservedForPod(pod, resourceClaim) {
return fmt.Errorf("pod %s(%s) is not allowed to use resource claim %s(%s)",
pod.Name, pod.UID, podResourceClaim.Name, resourceClaim.UID)
}
// Check if pod is in the ReservedFor for the claim
if !resourceclaim.IsReservedForPod(pod, resourceClaim) {
return fmt.Errorf("pod %s(%s) is not allowed to use resource claim %s(%s)",
pod.Name, pod.UID, podResourceClaim.Name, resourceClaim.UID)
}
// Call NodePrepareResource RPC
driverName := resourceClaim.Status.DriverName
// Call NodePrepareResource RPC
driverName := resourceClaim.Status.DriverName
client, err := dra.NewDRAPluginClient(driverName)
if err != nil {
return fmt.Errorf("failed to get DRA Plugin client for plugin name %s, err=%+v", driverName, err)
}
client, err := dra.NewDRAPluginClient(driverName)
if err != nil {
return fmt.Errorf("failed to get DRA Plugin client for plugin name %s, err=%+v", driverName, err)
}
response, err := client.NodePrepareResource(
context.Background(),
resourceClaim.Namespace,
resourceClaim.UID,
resourceClaim.Name,
resourceClaim.Status.Allocation.ResourceHandle)
if err != nil {
return fmt.Errorf("NodePrepareResource failed, claim UID: %s, claim name: %s, resource handle: %s, err: %+v",
resourceClaim.UID, resourceClaim.Name, resourceClaim.Status.Allocation.ResourceHandle, err)
}
klog.V(3).InfoS("NodePrepareResource succeeded", "response", response)
annotations, err := generateCDIAnnotations(resourceClaim.UID, driverName, response.CdiDevices)
if err != nil {
return fmt.Errorf("failed to generate container annotations, err: %+v", err)
}
// Cache prepared resource
err = m.cache.add(
resourceClaim.Name,
resourceClaim.Namespace,
&claimInfo{
driverName: driverName,
claimUID: resourceClaim.UID,
claimName: resourceClaim.Name,
namespace: resourceClaim.Namespace,
podUIDs: sets.New(string(pod.UID)),
cdiDevices: response.CdiDevices,
annotations: annotations,
})
if err != nil {
return fmt.Errorf(
"failed to cache prepared resource, claim: %s(%s), err: %+v",
resourceClaim.Name,
response, err := client.NodePrepareResource(
context.Background(),
resourceClaim.Namespace,
resourceClaim.UID,
err,
)
resourceClaim.Name,
resourceClaim.Status.Allocation.ResourceHandle)
if err != nil {
return fmt.Errorf("NodePrepareResource failed, claim UID: %s, claim name: %s, resource handle: %s, err: %+v",
resourceClaim.UID, resourceClaim.Name, resourceClaim.Status.Allocation.ResourceHandle, err)
}
klog.V(3).InfoS("NodePrepareResource succeeded", "response", response)
annotations, err := generateCDIAnnotations(resourceClaim.UID, driverName, response.CdiDevices)
if err != nil {
return fmt.Errorf("failed to generate container annotations, err: %+v", err)
}
// Cache prepared resource
err = m.cache.add(
resourceClaim.Name,
resourceClaim.Namespace,
&claimInfo{
driverName: driverName,
claimUID: resourceClaim.UID,
claimName: resourceClaim.Name,
namespace: resourceClaim.Namespace,
podUIDs: sets.New(string(pod.UID)),
cdiDevices: response.CdiDevices,
annotations: annotations,
})
if err != nil {
return fmt.Errorf(
"failed to cache prepared resource, claim: %s(%s), err: %+v",
resourceClaim.Name,
resourceClaim.UID,
err,
)
}
}
}
}
@ -157,9 +159,9 @@ func (m *ManagerImpl) prepareContainerResources(pod *v1.Pod, container *v1.Conta
return nil
}
// getContainerInfo gets a container info from the claimInfo cache.
// GetResources gets a ContainerInfo object from the claimInfo cache.
// This information is used by the caller to update a container config.
func (m *ManagerImpl) getContainerInfo(pod *v1.Pod, container *v1.Container) (*ContainerInfo, error) {
func (m *ManagerImpl) GetResources(pod *v1.Pod, container *v1.Container) (*ContainerInfo, error) {
annotations := []kubecontainer.Annotation{}
for i, podResourceClaim := range pod.Spec.ResourceClaims {
@ -183,15 +185,6 @@ func (m *ManagerImpl) getContainerInfo(pod *v1.Pod, container *v1.Container) (*C
return &ContainerInfo{Annotations: annotations}, nil
}
// PrepareResources calls plugin NodePrepareResource from the registered DRA resource plugins.
func (m *ManagerImpl) PrepareResources(pod *v1.Pod, container *v1.Container) (*ContainerInfo, error) {
if err := m.prepareContainerResources(pod, container); err != nil {
return nil, err
}
return m.getContainerInfo(pod, container)
}
// UnprepareResources calls a plugin's NodeUnprepareResource API for each resource claim owned by a pod.
// This function is idempotent and may be called multiple times against the same pod.
// As such, calls to the underlying NodeUnprepareResource API are skipped for claims that have

View File

@ -24,14 +24,17 @@ import (
// Manager manages all the DRA resource plugins running on a node.
type Manager interface {
// PrepareResources prepares resources for a container in a pod.
// It communicates with the DRA resource plugin to prepare resources and
// returns resource info to trigger CDI injection by the runtime.
PrepareResources(pod *v1.Pod, container *v1.Container) (*ContainerInfo, error)
// PrepareResources prepares resources for a pod.
// It communicates with the DRA resource plugin to prepare resources.
PrepareResources(pod *v1.Pod) error
// UnprepareResources calls NodeUnprepareResource GRPC from DRA plugin to unprepare pod resources
UnprepareResources(pod *v1.Pod) error
// GetResources gets a ContainerInfo object from the claimInfo cache.
// This information is used by the caller to update a container config.
GetResources(pod *v1.Pod, container *v1.Container) (*ContainerInfo, error)
// PodMightNeedToUnprepareResources returns true if the pod with the given UID
// might need to unprepare resources.
PodMightNeedToUnprepareResources(UID types.UID) bool

View File

@ -26,7 +26,6 @@ import (
internalapi "k8s.io/cri-api/pkg/apis"
podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager"
"k8s.io/kubernetes/pkg/kubelet/cm/dra"
"k8s.io/kubernetes/pkg/kubelet/cm/memorymanager"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
"k8s.io/kubernetes/pkg/kubelet/config"
@ -239,11 +238,11 @@ func (cm *FakeContainerManager) GetNodeAllocatableAbsolute() v1.ResourceList {
return nil
}
func (cm *FakeContainerManager) PrepareResources(pod *v1.Pod, container *v1.Container) (*dra.ContainerInfo, error) {
return nil, nil
func (cm *FakeContainerManager) PrepareDynamicResources(pod *v1.Pod) error {
return nil
}
func (cm *FakeContainerManager) UnprepareResources(*v1.Pod) error {
func (cm *FakeContainerManager) UnprepareDynamicResources(*v1.Pod) error {
return nil
}

View File

@ -60,6 +60,12 @@ type RuntimeHelper interface {
// GetOrCreateUserNamespaceMappings returns the configuration for the sandbox user namespace
GetOrCreateUserNamespaceMappings(pod *v1.Pod) (*runtimeapi.UserNamespace, error)
// PrepareDynamicResources prepares resources for a pod.
PrepareDynamicResources(pod *v1.Pod) error
// UnprepareDynamicResources unprepares resources for a a pod.
UnprepareDynamicResources(pod *v1.Pod) error
}
// ShouldContainerBeRestarted checks whether a container needs to be restarted.

View File

@ -71,3 +71,11 @@ func (f *FakeRuntimeHelper) GetExtraSupplementalGroupsForPod(pod *v1.Pod) []int6
func (f *FakeRuntimeHelper) GetOrCreateUserNamespaceMappings(pod *v1.Pod) (*runtimeapi.UserNamespace, error) {
return nil, nil
}
func (f *FakeRuntimeHelper) PrepareDynamicResources(pod *v1.Pod) error {
return nil
}
func (f *FakeRuntimeHelper) UnprepareDynamicResources(pod *v1.Pod) error {
return nil
}

View File

@ -1943,7 +1943,7 @@ func (kl *Kubelet) syncTerminatingPod(_ context.Context, pod *v1.Pod, podStatus
// and BEFORE the pod status is changed on the API server
// to avoid race conditions with the resource deallocation code in kubernetes core.
if utilfeature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation) {
if err := kl.containerManager.UnprepareResources(pod); err != nil {
if err := kl.UnprepareDynamicResources(pod); err != nil {
return err
}
}
@ -2613,3 +2613,15 @@ func isSyncPodWorthy(event *pleg.PodLifecycleEvent) bool {
// ContainerRemoved doesn't affect pod state
return event.Type != pleg.ContainerRemoved
}
// PrepareDynamicResources calls the container Manager PrepareDynamicResources API
// This method implements the RuntimeHelper interface
func (kl *Kubelet) PrepareDynamicResources(pod *v1.Pod) error {
return kl.containerManager.PrepareDynamicResources(pod)
}
// UnprepareDynamicResources calls the container Manager UnprepareDynamicResources API
// This method implements the RuntimeHelper interface
func (kl *Kubelet) UnprepareDynamicResources(pod *v1.Pod) error {
return kl.containerManager.UnprepareDynamicResources(pod)
}

View File

@ -764,6 +764,13 @@ func (m *kubeGenericRuntimeManager) SyncPod(ctx context.Context, pod *v1.Pod, po
// When runc supports slash as sysctl separator, this function can no longer be used.
sysctl.ConvertPodSysctlsVariableToDotsSeparator(pod.Spec.SecurityContext)
// Prepare resources allocated by the Dynammic Resource Allocation feature for the pod
if utilfeature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation) {
if m.runtimeHelper.PrepareDynamicResources(pod) != nil {
return
}
}
podSandboxID, msg, err = m.createPodSandbox(ctx, pod, podContainerChanges.Attempt)
if err != nil {
// createPodSandbox can return an error from CNI, CSI,