kubelet: DRA: add a reconcile loop to unprepare claims for deleted pods

Signed-off-by: Kevin Klues <kklues@nvidia.com>
This commit is contained in:
Kevin Klues 2024-04-30 11:10:35 +00:00
parent a8931c6c25
commit 639e887631
3 changed files with 100 additions and 3 deletions

View File

@ -305,7 +305,7 @@ func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.I
}
cm.topologyManager.AddHintProvider(cm.deviceManager)
// initialize DRA manager
// Initialize DRA manager
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.DynamicResourceAllocation) {
klog.InfoS("Creating Dynamic Resource Allocation (DRA) manager")
cm.draManager, err = dra.NewManagerImpl(kubeClient, nodeConfig.KubeletRootDir, nodeConfig.NodeName)
@ -564,6 +564,14 @@ func (cm *containerManagerImpl) Start(node *v1.Node,
containerMap, containerRunningSet := buildContainerMapAndRunningSetFromRuntime(ctx, runtimeService)
// Initialize DRA manager
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.DynamicResourceAllocation) {
err := cm.draManager.Start(dra.ActivePodsFunc(activePods), sourcesReady)
if err != nil {
return fmt.Errorf("start dra manager error: %w", err)
}
}
// Initialize CPU manager
err := cm.cpuManager.Start(cpumanager.ActivePodsFunc(activePods), sourcesReady, podStatusProvider, runtimeService, containerMap)
if err != nil {

View File

@ -19,27 +19,48 @@ package dra
import (
"context"
"fmt"
"time"
v1 "k8s.io/api/core/v1"
resourceapi "k8s.io/api/resource/v1alpha2"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/dynamic-resource-allocation/resourceclaim"
"k8s.io/klog/v2"
drapb "k8s.io/kubelet/pkg/apis/dra/v1alpha3"
dra "k8s.io/kubernetes/pkg/kubelet/cm/dra/plugin"
"k8s.io/kubernetes/pkg/kubelet/config"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
)
// draManagerStateFileName is the file name where dra manager stores its state
const draManagerStateFileName = "dra_manager_state"
// defaultReconcilePeriod is the default reconciliation period to keep all claim info state in sync.
const defaultReconcilePeriod = 60 * time.Second
// ActivePodsFunc is a function that returns a list of pods to reconcile.
type ActivePodsFunc func() []*v1.Pod
// ManagerImpl is the structure in charge of managing DRA resource Plugins.
type ManagerImpl struct {
// cache contains cached claim info
cache *claimInfoCache
// reconcilePeriod is the duration between calls to reconcileLoop.
reconcilePeriod time.Duration
// activePods is a method for listing active pods on the node
// so all claim info state can be updated in the reconciliation loop.
activePods ActivePodsFunc
// sourcesReady provides the readiness of kubelet configuration sources such as apiserver update readiness.
// We use it to determine when we can treat pods as inactive and react appropriately.
sourcesReady config.SourcesReady
// KubeClient reference
kubeClient clientset.Interface
}
@ -53,14 +74,77 @@ func NewManagerImpl(kubeClient clientset.Interface, stateFileDirectory string, n
return nil, fmt.Errorf("failed to create claimInfo cache: %+v", err)
}
// TODO: for now the reconcile period is not configurable.
// We should consider making it configurable in the future.
reconcilePeriod := defaultReconcilePeriod
manager := &ManagerImpl{
cache: claimInfoCache,
kubeClient: kubeClient,
cache: claimInfoCache,
kubeClient: kubeClient,
reconcilePeriod: reconcilePeriod,
activePods: nil,
sourcesReady: nil,
}
return manager, nil
}
// Start starts the reconcile loop of the manager.
func (m *ManagerImpl) Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady) error {
m.activePods = activePods
m.sourcesReady = sourcesReady
go wait.Until(func() { m.reconcileLoop() }, m.reconcilePeriod, wait.NeverStop)
return nil
}
// reconcileLoop ensures that any stale state in the manager's claimInfoCache gets periodically reconciled.
func (m *ManagerImpl) reconcileLoop() {
// Only once all sources are ready do we attempt to reconcile.
// This ensures that the call to m.activePods() below will succeed with
// the actual active pods list.
if m.sourcesReady == nil || !m.sourcesReady.AllReady() {
return
}
// Get the full list of active pods.
activePods := sets.New[string]()
for _, p := range m.activePods() {
activePods.Insert(string(p.UID))
}
// Get the list of inactive pods still referenced by any claimInfos.
type podClaims struct {
uid types.UID
namespace string
claimNames []string
}
inactivePodClaims := make(map[string]*podClaims)
m.cache.RLock()
for _, claimInfo := range m.cache.claimInfo {
for podUID := range claimInfo.PodUIDs {
if activePods.Has(podUID) {
continue
}
if inactivePodClaims[podUID] == nil {
inactivePodClaims[podUID] = &podClaims{
uid: types.UID(podUID),
namespace: claimInfo.Namespace,
claimNames: []string{},
}
}
inactivePodClaims[podUID].claimNames = append(inactivePodClaims[podUID].claimNames, claimInfo.ClaimName)
}
}
m.cache.RUnlock()
// Loop through all inactive pods and call UnprepareResources on them.
for _, podClaims := range inactivePodClaims {
if err := m.unprepareResources(podClaims.uid, podClaims.namespace, podClaims.claimNames); err != nil {
klog.ErrorS(err, "Unpreparing pod resources in reconcile loop", "podUID", podClaims.uid)
}
}
}
// PrepareResources attempts to prepare all of the required resource
// plugin resources for the input container, issue NodePrepareResources rpc requests
// for each new resource requirement, process their responses and update the cached

View File

@ -19,11 +19,16 @@ package dra
import (
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/pkg/kubelet/config"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
)
// Manager manages all the DRA resource plugins running on a node.
type Manager interface {
// Start starts the reconcile loop of the manager.
// This will ensure that all claims are unprepared even if pods get deleted unexpectedly.
Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady) error
// PrepareResources prepares resources for a pod.
// It communicates with the DRA resource plugin to prepare resources.
PrepareResources(pod *v1.Pod) error