Merge pull request #128298 from SergeyKanzhelev/convergePluginRegistrationForDRAAndDP

converge DRA and Device Plugin plugins registration
This commit is contained in:
Kubernetes Prow Robot 2024-11-04 11:41:28 +00:00 committed by GitHub
commit c6ea102f5f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 56 additions and 23 deletions

View File

@ -57,12 +57,14 @@ const (
type ActivePodsFunc func() []*v1.Pod
type GetNodeFunc func() (*v1.Node, error)
// Manages the containers running on a machine.
type ContainerManager interface {
// Runs the container manager's housekeeping.
// - Ensures that the Docker daemon is in a container.
// - Creates the system container where all non-containerized processes run.
Start(context.Context, *v1.Node, ActivePodsFunc, config.SourcesReady, status.PodStatusProvider, internalapi.RuntimeService, bool) error
Start(context.Context, *v1.Node, ActivePodsFunc, GetNodeFunc, config.SourcesReady, status.PodStatusProvider, internalapi.RuntimeService, bool) error
// SystemCgroupsLimit returns resources allocated to system cgroups in the machine.
// These cgroups include the system and Kubernetes services.
@ -115,10 +117,10 @@ type ContainerManager interface {
// GetPodCgroupRoot returns the cgroup which contains all pods.
GetPodCgroupRoot() string
// GetPluginRegistrationHandler returns a plugin registration handler
// GetPluginRegistrationHandlers returns a set of plugin registration handlers
// The pluginwatcher's Handlers allow to have a single module for handling
// registration.
GetPluginRegistrationHandler() cache.PluginHandler
GetPluginRegistrationHandlers() map[string]cache.PluginHandler
// ShouldResetExtendedResourceCapacity returns whether or not the extended resources should be zeroed,
// due to node recreation.

View File

@ -46,6 +46,7 @@ import (
"k8s.io/client-go/tools/record"
utilsysctl "k8s.io/component-helpers/node/util/sysctl"
internalapi "k8s.io/cri-api/pkg/apis"
pluginwatcherapi "k8s.io/kubelet/pkg/apis/pluginregistration/v1"
podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1"
kubefeatures "k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/kubelet/cadvisor"
@ -132,6 +133,8 @@ type containerManagerImpl struct {
topologyManager topologymanager.Manager
// Interface for Dynamic Resource Allocation management.
draManager dra.Manager
// kubeClient is the interface to the Kubernetes API server. May be nil if the kubelet is running in standalone mode.
kubeClient clientset.Interface
}
type features struct {
@ -312,6 +315,7 @@ func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.I
return nil, err
}
}
cm.kubeClient = kubeClient
// Initialize CPU manager
cm.cpuManager, err = cpumanager.NewManager(
@ -555,6 +559,7 @@ func (cm *containerManagerImpl) Status() Status {
func (cm *containerManagerImpl) Start(ctx context.Context, node *v1.Node,
activePods ActivePodsFunc,
getNode GetNodeFunc,
sourcesReady config.SourcesReady,
podStatusProvider status.PodStatusProvider,
runtimeService internalapi.RuntimeService,
@ -564,7 +569,7 @@ func (cm *containerManagerImpl) Start(ctx context.Context, node *v1.Node,
// Initialize DRA manager
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.DynamicResourceAllocation) {
err := cm.draManager.Start(ctx, dra.ActivePodsFunc(activePods), sourcesReady)
err := cm.draManager.Start(ctx, dra.ActivePodsFunc(activePods), dra.GetNodeFunc(getNode), sourcesReady)
if err != nil {
return fmt.Errorf("start dra manager error: %w", err)
}
@ -649,8 +654,16 @@ func (cm *containerManagerImpl) Start(ctx context.Context, node *v1.Node,
return nil
}
func (cm *containerManagerImpl) GetPluginRegistrationHandler() cache.PluginHandler {
return cm.deviceManager.GetWatcherHandler()
func (cm *containerManagerImpl) GetPluginRegistrationHandlers() map[string]cache.PluginHandler {
res := map[string]cache.PluginHandler{
pluginwatcherapi.DevicePlugin: cm.deviceManager.GetWatcherHandler(),
}
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.DynamicResourceAllocation) {
res[pluginwatcherapi.DRAPlugin] = cm.draManager.GetWatcherHandler()
}
return res
}
// TODO: move the GetResources logic to PodContainerManager.

View File

@ -46,7 +46,7 @@ type containerManagerStub struct {
var _ ContainerManager = &containerManagerStub{}
func (cm *containerManagerStub) Start(_ context.Context, _ *v1.Node, _ ActivePodsFunc, _ config.SourcesReady, _ status.PodStatusProvider, _ internalapi.RuntimeService, _ bool) error {
func (cm *containerManagerStub) Start(_ context.Context, _ *v1.Node, _ ActivePodsFunc, _ GetNodeFunc, _ config.SourcesReady, _ status.PodStatusProvider, _ internalapi.RuntimeService, _ bool) error {
klog.V(2).InfoS("Starting stub container manager")
return nil
}
@ -91,7 +91,7 @@ func (cm *containerManagerStub) GetCapacity(localStorageCapacityIsolation bool)
return c
}
func (cm *containerManagerStub) GetPluginRegistrationHandler() cache.PluginHandler {
func (cm *containerManagerStub) GetPluginRegistrationHandlers() map[string]cache.PluginHandler {
return nil
}

View File

@ -40,7 +40,7 @@ type unsupportedContainerManager struct {
var _ ContainerManager = &unsupportedContainerManager{}
func (unsupportedContainerManager) Start(_ context.Context, _ *v1.Node, _ ActivePodsFunc, _ config.SourcesReady, _ status.PodStatusProvider, _ internalapi.RuntimeService, _ bool) error {
func (unsupportedContainerManager) Start(_ context.Context, _ *v1.Node, _ ActivePodsFunc, _ GetNodeFunc, _ config.SourcesReady, _ status.PodStatusProvider, _ internalapi.RuntimeService, _ bool) error {
return fmt.Errorf("Container Manager is unsupported in this build")
}

View File

@ -35,6 +35,7 @@ import (
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/record"
internalapi "k8s.io/cri-api/pkg/apis"
pluginwatcherapi "k8s.io/kubelet/pkg/apis/pluginregistration/v1"
podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1"
"k8s.io/kubernetes/pkg/kubelet/cadvisor"
"k8s.io/kubernetes/pkg/kubelet/cm/admission"
@ -72,6 +73,7 @@ func (ra *noopWindowsResourceAllocator) Admit(attrs *lifecycle.PodAdmitAttribute
func (cm *containerManagerImpl) Start(ctx context.Context, node *v1.Node,
activePods ActivePodsFunc,
getNode GetNodeFunc,
sourcesReady config.SourcesReady,
podStatusProvider status.PodStatusProvider,
runtimeService internalapi.RuntimeService,
@ -176,8 +178,9 @@ func (cm *containerManagerImpl) GetCapacity(localStorageCapacityIsolation bool)
return cm.capacity
}
func (cm *containerManagerImpl) GetPluginRegistrationHandler() cache.PluginHandler {
return cm.deviceManager.GetWatcherHandler()
func (cm *containerManagerImpl) GetPluginRegistrationHandlers() map[string]cache.PluginHandler {
// DRA is not supported on Windows, only device plugin is supported
return map[string]cache.PluginHandler{pluginwatcherapi.DevicePlugin: cm.deviceManager.GetWatcherHandler()}
}
func (cm *containerManagerImpl) GetDevicePluginResourceCapacity() (v1.ResourceList, v1.ResourceList, []string) {

View File

@ -59,6 +59,8 @@ type Manager interface {
// GetCapacity returns the amount of available device plugin resource capacity, resource allocatable
// and inactive device plugin resources previously registered on the node.
GetCapacity() (v1.ResourceList, v1.ResourceList, []string)
// GetWatcherHandler returns the plugin handler for the device manager.
GetWatcherHandler() cache.PluginHandler
// GetDevices returns information about the devices assigned to pods and containers

View File

@ -37,6 +37,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/config"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/metrics"
"k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache"
)
// draManagerStateFileName is the file name where dra manager stores its state
@ -48,6 +49,9 @@ const defaultReconcilePeriod = 60 * time.Second
// ActivePodsFunc is a function that returns a list of pods to reconcile.
type ActivePodsFunc func() []*v1.Pod
// GetNodeFunc is a function that returns the node object using the kubelet's node lister.
type GetNodeFunc func() (*v1.Node, error)
// ManagerImpl is the structure in charge of managing DRA drivers.
type ManagerImpl struct {
// cache contains cached claim info
@ -66,6 +70,9 @@ type ManagerImpl struct {
// KubeClient reference
kubeClient clientset.Interface
// getNode is a function that returns the node object using the kubelet's node lister.
getNode GetNodeFunc
}
// NewManagerImpl creates a new manager.
@ -90,9 +97,14 @@ func NewManagerImpl(kubeClient clientset.Interface, stateFileDirectory string, n
return manager, nil
}
func (m *ManagerImpl) GetWatcherHandler() cache.PluginHandler {
return cache.PluginHandler(dra.NewRegistrationHandler(m.kubeClient, m.getNode))
}
// Start starts the reconcile loop of the manager.
func (m *ManagerImpl) Start(ctx context.Context, activePods ActivePodsFunc, sourcesReady config.SourcesReady) error {
func (m *ManagerImpl) Start(ctx context.Context, activePods ActivePodsFunc, getNode GetNodeFunc, sourcesReady config.SourcesReady) error {
m.activePods = activePods
m.getNode = getNode
m.sourcesReady = sourcesReady
go wait.UntilWithContext(ctx, func(ctx context.Context) { m.reconcileLoop(ctx) }, m.reconcilePeriod)
return nil

View File

@ -23,13 +23,17 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/pkg/kubelet/config"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache"
)
// Manager manages all the DRA resource plugins running on a node.
type Manager interface {
// GetWatcherHandler returns the plugin handler for the DRA.
GetWatcherHandler() cache.PluginHandler
// Start starts the reconcile loop of the manager.
// This will ensure that all claims are unprepared even if pods get deleted unexpectedly.
Start(ctx context.Context, activePods ActivePodsFunc, sourcesReady config.SourcesReady) error
Start(ctx context.Context, activePods ActivePodsFunc, getNode GetNodeFunc, sourcesReady config.SourcesReady) error
// PrepareResources prepares resources for a pod.
// It communicates with the DRA resource plugin to prepare resources.

View File

@ -53,7 +53,7 @@ func NewFakeContainerManager() *FakeContainerManager {
}
}
func (cm *FakeContainerManager) Start(_ context.Context, _ *v1.Node, _ ActivePodsFunc, _ config.SourcesReady, _ status.PodStatusProvider, _ internalapi.RuntimeService, _ bool) error {
func (cm *FakeContainerManager) Start(_ context.Context, _ *v1.Node, _ ActivePodsFunc, _ GetNodeFunc, _ config.SourcesReady, _ status.PodStatusProvider, _ internalapi.RuntimeService, _ bool) error {
cm.Lock()
defer cm.Unlock()
cm.CalledFunctions = append(cm.CalledFunctions, "Start")
@ -124,10 +124,10 @@ func (cm *FakeContainerManager) GetCapacity(localStorageCapacityIsolation bool)
return c
}
func (cm *FakeContainerManager) GetPluginRegistrationHandler() cache.PluginHandler {
func (cm *FakeContainerManager) GetPluginRegistrationHandlers() map[string]cache.PluginHandler {
cm.Lock()
defer cm.Unlock()
cm.CalledFunctions = append(cm.CalledFunctions, "GetPluginRegistrationHandler")
cm.CalledFunctions = append(cm.CalledFunctions, "GetPluginRegistrationHandlers")
return nil
}

View File

@ -81,7 +81,6 @@ import (
"k8s.io/kubernetes/pkg/kubelet/cloudresource"
"k8s.io/kubernetes/pkg/kubelet/clustertrustbundle"
"k8s.io/kubernetes/pkg/kubelet/cm"
draplugin "k8s.io/kubernetes/pkg/kubelet/cm/dra/plugin"
"k8s.io/kubernetes/pkg/kubelet/config"
"k8s.io/kubernetes/pkg/kubelet/configmap"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
@ -1575,7 +1574,7 @@ func (kl *Kubelet) initializeRuntimeDependentModules() {
os.Exit(1)
}
// containerManager must start after cAdvisor because it needs filesystem capacity information
if err := kl.containerManager.Start(context.TODO(), node, kl.GetActivePods, kl.sourcesReady, kl.statusManager, kl.runtimeService, kl.supportLocalStorageCapacityIsolation()); err != nil {
if err := kl.containerManager.Start(context.TODO(), node, kl.GetActivePods, kl.getNodeAnyWay, kl.sourcesReady, kl.statusManager, kl.runtimeService, kl.supportLocalStorageCapacityIsolation()); err != nil {
// Fail kubelet and rely on the babysitter to retry starting kubelet.
klog.ErrorS(err, "Failed to start ContainerManager")
os.Exit(1)
@ -1589,12 +1588,10 @@ func (kl *Kubelet) initializeRuntimeDependentModules() {
kl.containerLogManager.Start()
// Adding Registration Callback function for CSI Driver
kl.pluginManager.AddHandler(pluginwatcherapi.CSIPlugin, plugincache.PluginHandler(csi.PluginHandler))
// Adding Registration Callback function for DRA Plugin
if utilfeature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation) {
kl.pluginManager.AddHandler(pluginwatcherapi.DRAPlugin, plugincache.PluginHandler(draplugin.NewRegistrationHandler(kl.kubeClient, kl.getNodeAnyWay)))
// Adding Registration Callback function for DRA Plugin and Device Plugin
for name, handler := range kl.containerManager.GetPluginRegistrationHandlers() {
kl.pluginManager.AddHandler(name, handler)
}
// Adding Registration Callback function for Device Manager
kl.pluginManager.AddHandler(pluginwatcherapi.DevicePlugin, kl.containerManager.GetPluginRegistrationHandler())
// Start the plugin manager
klog.V(4).InfoS("Starting plugin manager")