converge DRA and Device Plugin plugins registration

This commit is contained in:
Sergey Kanzhelev 2024-10-24 00:54:30 +00:00
parent 6435489064
commit 1297d0cdd1
10 changed files with 56 additions and 23 deletions

View File

@ -57,12 +57,14 @@ const (
type ActivePodsFunc func() []*v1.Pod type ActivePodsFunc func() []*v1.Pod
type GetNodeFunc func() (*v1.Node, error)
// Manages the containers running on a machine. // Manages the containers running on a machine.
type ContainerManager interface { type ContainerManager interface {
// Runs the container manager's housekeeping. // Runs the container manager's housekeeping.
// - Ensures that the Docker daemon is in a container. // - Ensures that the Docker daemon is in a container.
// - Creates the system container where all non-containerized processes run. // - Creates the system container where all non-containerized processes run.
Start(context.Context, *v1.Node, ActivePodsFunc, config.SourcesReady, status.PodStatusProvider, internalapi.RuntimeService, bool) error Start(context.Context, *v1.Node, ActivePodsFunc, GetNodeFunc, config.SourcesReady, status.PodStatusProvider, internalapi.RuntimeService, bool) error
// SystemCgroupsLimit returns resources allocated to system cgroups in the machine. // SystemCgroupsLimit returns resources allocated to system cgroups in the machine.
// These cgroups include the system and Kubernetes services. // These cgroups include the system and Kubernetes services.
@ -115,10 +117,10 @@ type ContainerManager interface {
// GetPodCgroupRoot returns the cgroup which contains all pods. // GetPodCgroupRoot returns the cgroup which contains all pods.
GetPodCgroupRoot() string GetPodCgroupRoot() string
// GetPluginRegistrationHandler returns a plugin registration handler // GetPluginRegistrationHandlers returns a set of plugin registration handlers
// The pluginwatcher's Handlers allow to have a single module for handling // The pluginwatcher's Handlers allow to have a single module for handling
// registration. // registration.
GetPluginRegistrationHandler() cache.PluginHandler GetPluginRegistrationHandlers() map[string]cache.PluginHandler
// ShouldResetExtendedResourceCapacity returns whether or not the extended resources should be zeroed, // ShouldResetExtendedResourceCapacity returns whether or not the extended resources should be zeroed,
// due to node recreation. // due to node recreation.

View File

@ -46,6 +46,7 @@ import (
"k8s.io/client-go/tools/record" "k8s.io/client-go/tools/record"
utilsysctl "k8s.io/component-helpers/node/util/sysctl" utilsysctl "k8s.io/component-helpers/node/util/sysctl"
internalapi "k8s.io/cri-api/pkg/apis" internalapi "k8s.io/cri-api/pkg/apis"
pluginwatcherapi "k8s.io/kubelet/pkg/apis/pluginregistration/v1"
podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1" podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1"
kubefeatures "k8s.io/kubernetes/pkg/features" kubefeatures "k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/kubelet/cadvisor" "k8s.io/kubernetes/pkg/kubelet/cadvisor"
@ -132,6 +133,8 @@ type containerManagerImpl struct {
topologyManager topologymanager.Manager topologyManager topologymanager.Manager
// Interface for Dynamic Resource Allocation management. // Interface for Dynamic Resource Allocation management.
draManager dra.Manager draManager dra.Manager
// kubeClient is the interface to the Kubernetes API server. May be nil if the kubelet is running in standalone mode.
kubeClient clientset.Interface
} }
type features struct { type features struct {
@ -312,6 +315,7 @@ func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.I
return nil, err return nil, err
} }
} }
cm.kubeClient = kubeClient
// Initialize CPU manager // Initialize CPU manager
cm.cpuManager, err = cpumanager.NewManager( cm.cpuManager, err = cpumanager.NewManager(
@ -555,6 +559,7 @@ func (cm *containerManagerImpl) Status() Status {
func (cm *containerManagerImpl) Start(ctx context.Context, node *v1.Node, func (cm *containerManagerImpl) Start(ctx context.Context, node *v1.Node,
activePods ActivePodsFunc, activePods ActivePodsFunc,
getNode GetNodeFunc,
sourcesReady config.SourcesReady, sourcesReady config.SourcesReady,
podStatusProvider status.PodStatusProvider, podStatusProvider status.PodStatusProvider,
runtimeService internalapi.RuntimeService, runtimeService internalapi.RuntimeService,
@ -564,7 +569,7 @@ func (cm *containerManagerImpl) Start(ctx context.Context, node *v1.Node,
// Initialize DRA manager // Initialize DRA manager
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.DynamicResourceAllocation) { if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.DynamicResourceAllocation) {
err := cm.draManager.Start(ctx, dra.ActivePodsFunc(activePods), sourcesReady) err := cm.draManager.Start(ctx, dra.ActivePodsFunc(activePods), dra.GetNodeFunc(getNode), sourcesReady)
if err != nil { if err != nil {
return fmt.Errorf("start dra manager error: %w", err) return fmt.Errorf("start dra manager error: %w", err)
} }
@ -649,8 +654,16 @@ func (cm *containerManagerImpl) Start(ctx context.Context, node *v1.Node,
return nil return nil
} }
func (cm *containerManagerImpl) GetPluginRegistrationHandler() cache.PluginHandler { func (cm *containerManagerImpl) GetPluginRegistrationHandlers() map[string]cache.PluginHandler {
return cm.deviceManager.GetWatcherHandler() res := map[string]cache.PluginHandler{
pluginwatcherapi.DevicePlugin: cm.deviceManager.GetWatcherHandler(),
}
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.DynamicResourceAllocation) {
res[pluginwatcherapi.DRAPlugin] = cm.draManager.GetWatcherHandler()
}
return res
} }
// TODO: move the GetResources logic to PodContainerManager. // TODO: move the GetResources logic to PodContainerManager.

View File

@ -46,7 +46,7 @@ type containerManagerStub struct {
var _ ContainerManager = &containerManagerStub{} var _ ContainerManager = &containerManagerStub{}
func (cm *containerManagerStub) Start(_ context.Context, _ *v1.Node, _ ActivePodsFunc, _ config.SourcesReady, _ status.PodStatusProvider, _ internalapi.RuntimeService, _ bool) error { func (cm *containerManagerStub) Start(_ context.Context, _ *v1.Node, _ ActivePodsFunc, _ GetNodeFunc, _ config.SourcesReady, _ status.PodStatusProvider, _ internalapi.RuntimeService, _ bool) error {
klog.V(2).InfoS("Starting stub container manager") klog.V(2).InfoS("Starting stub container manager")
return nil return nil
} }
@ -91,7 +91,7 @@ func (cm *containerManagerStub) GetCapacity(localStorageCapacityIsolation bool)
return c return c
} }
func (cm *containerManagerStub) GetPluginRegistrationHandler() cache.PluginHandler { func (cm *containerManagerStub) GetPluginRegistrationHandlers() map[string]cache.PluginHandler {
return nil return nil
} }

View File

@ -40,7 +40,7 @@ type unsupportedContainerManager struct {
var _ ContainerManager = &unsupportedContainerManager{} var _ ContainerManager = &unsupportedContainerManager{}
func (unsupportedContainerManager) Start(_ context.Context, _ *v1.Node, _ ActivePodsFunc, _ config.SourcesReady, _ status.PodStatusProvider, _ internalapi.RuntimeService, _ bool) error { func (unsupportedContainerManager) Start(_ context.Context, _ *v1.Node, _ ActivePodsFunc, _ GetNodeFunc, _ config.SourcesReady, _ status.PodStatusProvider, _ internalapi.RuntimeService, _ bool) error {
return fmt.Errorf("Container Manager is unsupported in this build") return fmt.Errorf("Container Manager is unsupported in this build")
} }

View File

@ -35,6 +35,7 @@ import (
clientset "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/record" "k8s.io/client-go/tools/record"
internalapi "k8s.io/cri-api/pkg/apis" internalapi "k8s.io/cri-api/pkg/apis"
pluginwatcherapi "k8s.io/kubelet/pkg/apis/pluginregistration/v1"
podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1" podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1"
"k8s.io/kubernetes/pkg/kubelet/cadvisor" "k8s.io/kubernetes/pkg/kubelet/cadvisor"
"k8s.io/kubernetes/pkg/kubelet/cm/admission" "k8s.io/kubernetes/pkg/kubelet/cm/admission"
@ -72,6 +73,7 @@ func (ra *noopWindowsResourceAllocator) Admit(attrs *lifecycle.PodAdmitAttribute
func (cm *containerManagerImpl) Start(ctx context.Context, node *v1.Node, func (cm *containerManagerImpl) Start(ctx context.Context, node *v1.Node,
activePods ActivePodsFunc, activePods ActivePodsFunc,
getNode GetNodeFunc,
sourcesReady config.SourcesReady, sourcesReady config.SourcesReady,
podStatusProvider status.PodStatusProvider, podStatusProvider status.PodStatusProvider,
runtimeService internalapi.RuntimeService, runtimeService internalapi.RuntimeService,
@ -176,8 +178,9 @@ func (cm *containerManagerImpl) GetCapacity(localStorageCapacityIsolation bool)
return cm.capacity return cm.capacity
} }
func (cm *containerManagerImpl) GetPluginRegistrationHandler() cache.PluginHandler { func (cm *containerManagerImpl) GetPluginRegistrationHandlers() map[string]cache.PluginHandler {
return cm.deviceManager.GetWatcherHandler() // DRA is not supported on Windows, only device plugin is supported
return map[string]cache.PluginHandler{pluginwatcherapi.DevicePlugin: cm.deviceManager.GetWatcherHandler()}
} }
func (cm *containerManagerImpl) GetDevicePluginResourceCapacity() (v1.ResourceList, v1.ResourceList, []string) { func (cm *containerManagerImpl) GetDevicePluginResourceCapacity() (v1.ResourceList, v1.ResourceList, []string) {

View File

@ -59,6 +59,8 @@ type Manager interface {
// GetCapacity returns the amount of available device plugin resource capacity, resource allocatable // GetCapacity returns the amount of available device plugin resource capacity, resource allocatable
// and inactive device plugin resources previously registered on the node. // and inactive device plugin resources previously registered on the node.
GetCapacity() (v1.ResourceList, v1.ResourceList, []string) GetCapacity() (v1.ResourceList, v1.ResourceList, []string)
// GetWatcherHandler returns the plugin handler for the device manager.
GetWatcherHandler() cache.PluginHandler GetWatcherHandler() cache.PluginHandler
// GetDevices returns information about the devices assigned to pods and containers // GetDevices returns information about the devices assigned to pods and containers

View File

@ -37,6 +37,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/config" "k8s.io/kubernetes/pkg/kubelet/config"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/metrics" "k8s.io/kubernetes/pkg/kubelet/metrics"
"k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache"
) )
// draManagerStateFileName is the file name where dra manager stores its state // draManagerStateFileName is the file name where dra manager stores its state
@ -48,6 +49,9 @@ const defaultReconcilePeriod = 60 * time.Second
// ActivePodsFunc is a function that returns a list of pods to reconcile. // ActivePodsFunc is a function that returns a list of pods to reconcile.
type ActivePodsFunc func() []*v1.Pod type ActivePodsFunc func() []*v1.Pod
// GetNodeFunc is a function that returns the node object using the kubelet's node lister.
type GetNodeFunc func() (*v1.Node, error)
// ManagerImpl is the structure in charge of managing DRA drivers. // ManagerImpl is the structure in charge of managing DRA drivers.
type ManagerImpl struct { type ManagerImpl struct {
// cache contains cached claim info // cache contains cached claim info
@ -66,6 +70,9 @@ type ManagerImpl struct {
// KubeClient reference // KubeClient reference
kubeClient clientset.Interface kubeClient clientset.Interface
// getNode is a function that returns the node object using the kubelet's node lister.
getNode GetNodeFunc
} }
// NewManagerImpl creates a new manager. // NewManagerImpl creates a new manager.
@ -90,9 +97,14 @@ func NewManagerImpl(kubeClient clientset.Interface, stateFileDirectory string, n
return manager, nil return manager, nil
} }
func (m *ManagerImpl) GetWatcherHandler() cache.PluginHandler {
return cache.PluginHandler(dra.NewRegistrationHandler(m.kubeClient, m.getNode))
}
// Start starts the reconcile loop of the manager. // Start starts the reconcile loop of the manager.
func (m *ManagerImpl) Start(ctx context.Context, activePods ActivePodsFunc, sourcesReady config.SourcesReady) error { func (m *ManagerImpl) Start(ctx context.Context, activePods ActivePodsFunc, getNode GetNodeFunc, sourcesReady config.SourcesReady) error {
m.activePods = activePods m.activePods = activePods
m.getNode = getNode
m.sourcesReady = sourcesReady m.sourcesReady = sourcesReady
go wait.UntilWithContext(ctx, func(ctx context.Context) { m.reconcileLoop(ctx) }, m.reconcilePeriod) go wait.UntilWithContext(ctx, func(ctx context.Context) { m.reconcileLoop(ctx) }, m.reconcilePeriod)
return nil return nil

View File

@ -23,13 +23,17 @@ import (
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/pkg/kubelet/config" "k8s.io/kubernetes/pkg/kubelet/config"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache"
) )
// Manager manages all the DRA resource plugins running on a node. // Manager manages all the DRA resource plugins running on a node.
type Manager interface { type Manager interface {
// GetWatcherHandler returns the plugin handler for the DRA.
GetWatcherHandler() cache.PluginHandler
// Start starts the reconcile loop of the manager. // Start starts the reconcile loop of the manager.
// This will ensure that all claims are unprepared even if pods get deleted unexpectedly. // This will ensure that all claims are unprepared even if pods get deleted unexpectedly.
Start(ctx context.Context, activePods ActivePodsFunc, sourcesReady config.SourcesReady) error Start(ctx context.Context, activePods ActivePodsFunc, getNode GetNodeFunc, sourcesReady config.SourcesReady) error
// PrepareResources prepares resources for a pod. // PrepareResources prepares resources for a pod.
// It communicates with the DRA resource plugin to prepare resources. // It communicates with the DRA resource plugin to prepare resources.

View File

@ -53,7 +53,7 @@ func NewFakeContainerManager() *FakeContainerManager {
} }
} }
func (cm *FakeContainerManager) Start(_ context.Context, _ *v1.Node, _ ActivePodsFunc, _ config.SourcesReady, _ status.PodStatusProvider, _ internalapi.RuntimeService, _ bool) error { func (cm *FakeContainerManager) Start(_ context.Context, _ *v1.Node, _ ActivePodsFunc, _ GetNodeFunc, _ config.SourcesReady, _ status.PodStatusProvider, _ internalapi.RuntimeService, _ bool) error {
cm.Lock() cm.Lock()
defer cm.Unlock() defer cm.Unlock()
cm.CalledFunctions = append(cm.CalledFunctions, "Start") cm.CalledFunctions = append(cm.CalledFunctions, "Start")
@ -124,10 +124,10 @@ func (cm *FakeContainerManager) GetCapacity(localStorageCapacityIsolation bool)
return c return c
} }
func (cm *FakeContainerManager) GetPluginRegistrationHandler() cache.PluginHandler { func (cm *FakeContainerManager) GetPluginRegistrationHandlers() map[string]cache.PluginHandler {
cm.Lock() cm.Lock()
defer cm.Unlock() defer cm.Unlock()
cm.CalledFunctions = append(cm.CalledFunctions, "GetPluginRegistrationHandler") cm.CalledFunctions = append(cm.CalledFunctions, "GetPluginRegistrationHandlers")
return nil return nil
} }

View File

@ -81,7 +81,6 @@ import (
"k8s.io/kubernetes/pkg/kubelet/cloudresource" "k8s.io/kubernetes/pkg/kubelet/cloudresource"
"k8s.io/kubernetes/pkg/kubelet/clustertrustbundle" "k8s.io/kubernetes/pkg/kubelet/clustertrustbundle"
"k8s.io/kubernetes/pkg/kubelet/cm" "k8s.io/kubernetes/pkg/kubelet/cm"
draplugin "k8s.io/kubernetes/pkg/kubelet/cm/dra/plugin"
"k8s.io/kubernetes/pkg/kubelet/config" "k8s.io/kubernetes/pkg/kubelet/config"
"k8s.io/kubernetes/pkg/kubelet/configmap" "k8s.io/kubernetes/pkg/kubelet/configmap"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
@ -1575,7 +1574,7 @@ func (kl *Kubelet) initializeRuntimeDependentModules() {
os.Exit(1) os.Exit(1)
} }
// containerManager must start after cAdvisor because it needs filesystem capacity information // containerManager must start after cAdvisor because it needs filesystem capacity information
if err := kl.containerManager.Start(context.TODO(), node, kl.GetActivePods, kl.sourcesReady, kl.statusManager, kl.runtimeService, kl.supportLocalStorageCapacityIsolation()); err != nil { if err := kl.containerManager.Start(context.TODO(), node, kl.GetActivePods, kl.getNodeAnyWay, kl.sourcesReady, kl.statusManager, kl.runtimeService, kl.supportLocalStorageCapacityIsolation()); err != nil {
// Fail kubelet and rely on the babysitter to retry starting kubelet. // Fail kubelet and rely on the babysitter to retry starting kubelet.
klog.ErrorS(err, "Failed to start ContainerManager") klog.ErrorS(err, "Failed to start ContainerManager")
os.Exit(1) os.Exit(1)
@ -1589,12 +1588,10 @@ func (kl *Kubelet) initializeRuntimeDependentModules() {
kl.containerLogManager.Start() kl.containerLogManager.Start()
// Adding Registration Callback function for CSI Driver // Adding Registration Callback function for CSI Driver
kl.pluginManager.AddHandler(pluginwatcherapi.CSIPlugin, plugincache.PluginHandler(csi.PluginHandler)) kl.pluginManager.AddHandler(pluginwatcherapi.CSIPlugin, plugincache.PluginHandler(csi.PluginHandler))
// Adding Registration Callback function for DRA Plugin // Adding Registration Callback function for DRA Plugin and Device Plugin
if utilfeature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation) { for name, handler := range kl.containerManager.GetPluginRegistrationHandlers() {
kl.pluginManager.AddHandler(pluginwatcherapi.DRAPlugin, plugincache.PluginHandler(draplugin.NewRegistrationHandler(kl.kubeClient, kl.getNodeAnyWay))) kl.pluginManager.AddHandler(name, handler)
} }
// Adding Registration Callback function for Device Manager
kl.pluginManager.AddHandler(pluginwatcherapi.DevicePlugin, kl.containerManager.GetPluginRegistrationHandler())
// Start the plugin manager // Start the plugin manager
klog.V(4).InfoS("Starting plugin manager") klog.V(4).InfoS("Starting plugin manager")