From 639e887631344ff6f7c0d44e93edb07a738219cd Mon Sep 17 00:00:00 2001 From: Kevin Klues Date: Tue, 30 Apr 2024 11:10:35 +0000 Subject: [PATCH] kubelet: DRA: add a reconcile loop to unprepare claims for deleted pods Signed-off-by: Kevin Klues --- pkg/kubelet/cm/container_manager_linux.go | 10 ++- pkg/kubelet/cm/dra/manager.go | 88 ++++++++++++++++++++++- pkg/kubelet/cm/dra/types.go | 5 ++ 3 files changed, 100 insertions(+), 3 deletions(-) diff --git a/pkg/kubelet/cm/container_manager_linux.go b/pkg/kubelet/cm/container_manager_linux.go index a85d3d5ff27..fb666a8333f 100644 --- a/pkg/kubelet/cm/container_manager_linux.go +++ b/pkg/kubelet/cm/container_manager_linux.go @@ -305,7 +305,7 @@ func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.I } cm.topologyManager.AddHintProvider(cm.deviceManager) - // initialize DRA manager + // Initialize DRA manager if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.DynamicResourceAllocation) { klog.InfoS("Creating Dynamic Resource Allocation (DRA) manager") cm.draManager, err = dra.NewManagerImpl(kubeClient, nodeConfig.KubeletRootDir, nodeConfig.NodeName) @@ -564,6 +564,14 @@ func (cm *containerManagerImpl) Start(node *v1.Node, containerMap, containerRunningSet := buildContainerMapAndRunningSetFromRuntime(ctx, runtimeService) + // Initialize DRA manager + if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.DynamicResourceAllocation) { + err := cm.draManager.Start(dra.ActivePodsFunc(activePods), sourcesReady) + if err != nil { + return fmt.Errorf("start dra manager error: %w", err) + } + } + // Initialize CPU manager err := cm.cpuManager.Start(cpumanager.ActivePodsFunc(activePods), sourcesReady, podStatusProvider, runtimeService, containerMap) if err != nil { diff --git a/pkg/kubelet/cm/dra/manager.go b/pkg/kubelet/cm/dra/manager.go index f55037ef7d3..07ad55d71a2 100644 --- a/pkg/kubelet/cm/dra/manager.go +++ b/pkg/kubelet/cm/dra/manager.go @@ -19,27 +19,48 @@ package dra import ( "context" "fmt" + "time" v1 "k8s.io/api/core/v1" resourceapi "k8s.io/api/resource/v1alpha2" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/wait" clientset "k8s.io/client-go/kubernetes" "k8s.io/dynamic-resource-allocation/resourceclaim" "k8s.io/klog/v2" drapb "k8s.io/kubelet/pkg/apis/dra/v1alpha3" dra "k8s.io/kubernetes/pkg/kubelet/cm/dra/plugin" + "k8s.io/kubernetes/pkg/kubelet/config" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" ) // draManagerStateFileName is the file name where dra manager stores its state const draManagerStateFileName = "dra_manager_state" +// defaultReconcilePeriod is the default reconciliation period to keep all claim info state in sync. +const defaultReconcilePeriod = 60 * time.Second + +// ActivePodsFunc is a function that returns a list of pods to reconcile. +type ActivePodsFunc func() []*v1.Pod + // ManagerImpl is the structure in charge of managing DRA resource Plugins. type ManagerImpl struct { // cache contains cached claim info cache *claimInfoCache + // reconcilePeriod is the duration between calls to reconcileLoop. + reconcilePeriod time.Duration + + // activePods is a method for listing active pods on the node + // so all claim info state can be updated in the reconciliation loop. + activePods ActivePodsFunc + + // sourcesReady provides the readiness of kubelet configuration sources such as apiserver update readiness. + // We use it to determine when we can treat pods as inactive and react appropriately. + sourcesReady config.SourcesReady + // KubeClient reference kubeClient clientset.Interface } @@ -53,14 +74,77 @@ func NewManagerImpl(kubeClient clientset.Interface, stateFileDirectory string, n return nil, fmt.Errorf("failed to create claimInfo cache: %+v", err) } + // TODO: for now the reconcile period is not configurable. + // We should consider making it configurable in the future. + reconcilePeriod := defaultReconcilePeriod + manager := &ManagerImpl{ - cache: claimInfoCache, - kubeClient: kubeClient, + cache: claimInfoCache, + kubeClient: kubeClient, + reconcilePeriod: reconcilePeriod, + activePods: nil, + sourcesReady: nil, } return manager, nil } +// Start starts the reconcile loop of the manager. +func (m *ManagerImpl) Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady) error { + m.activePods = activePods + m.sourcesReady = sourcesReady + go wait.Until(func() { m.reconcileLoop() }, m.reconcilePeriod, wait.NeverStop) + return nil +} + +// reconcileLoop ensures that any stale state in the manager's claimInfoCache gets periodically reconciled. +func (m *ManagerImpl) reconcileLoop() { + // Only once all sources are ready do we attempt to reconcile. + // This ensures that the call to m.activePods() below will succeed with + // the actual active pods list. + if m.sourcesReady == nil || !m.sourcesReady.AllReady() { + return + } + + // Get the full list of active pods. + activePods := sets.New[string]() + for _, p := range m.activePods() { + activePods.Insert(string(p.UID)) + } + + // Get the list of inactive pods still referenced by any claimInfos. + type podClaims struct { + uid types.UID + namespace string + claimNames []string + } + inactivePodClaims := make(map[string]*podClaims) + m.cache.RLock() + for _, claimInfo := range m.cache.claimInfo { + for podUID := range claimInfo.PodUIDs { + if activePods.Has(podUID) { + continue + } + if inactivePodClaims[podUID] == nil { + inactivePodClaims[podUID] = &podClaims{ + uid: types.UID(podUID), + namespace: claimInfo.Namespace, + claimNames: []string{}, + } + } + inactivePodClaims[podUID].claimNames = append(inactivePodClaims[podUID].claimNames, claimInfo.ClaimName) + } + } + m.cache.RUnlock() + + // Loop through all inactive pods and call UnprepareResources on them. + for _, podClaims := range inactivePodClaims { + if err := m.unprepareResources(podClaims.uid, podClaims.namespace, podClaims.claimNames); err != nil { + klog.ErrorS(err, "Unpreparing pod resources in reconcile loop", "podUID", podClaims.uid) + } + } +} + // PrepareResources attempts to prepare all of the required resource // plugin resources for the input container, issue NodePrepareResources rpc requests // for each new resource requirement, process their responses and update the cached diff --git a/pkg/kubelet/cm/dra/types.go b/pkg/kubelet/cm/dra/types.go index 58c8ca0dd65..e009e952eb4 100644 --- a/pkg/kubelet/cm/dra/types.go +++ b/pkg/kubelet/cm/dra/types.go @@ -19,11 +19,16 @@ package dra import ( v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/kubernetes/pkg/kubelet/config" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" ) // Manager manages all the DRA resource plugins running on a node. type Manager interface { + // Start starts the reconcile loop of the manager. + // This will ensure that all claims are unprepared even if pods get deleted unexpectedly. + Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady) error + // PrepareResources prepares resources for a pod. // It communicates with the DRA resource plugin to prepare resources. PrepareResources(pod *v1.Pod) error