diff --git a/pkg/kubelet/cm/container_manager.go b/pkg/kubelet/cm/container_manager.go index 70edb484109..6e68cc41c67 100644 --- a/pkg/kubelet/cm/container_manager.go +++ b/pkg/kubelet/cm/container_manager.go @@ -57,12 +57,14 @@ const ( type ActivePodsFunc func() []*v1.Pod +type GetNodeFunc func() (*v1.Node, error) + // Manages the containers running on a machine. type ContainerManager interface { // Runs the container manager's housekeeping. // - Ensures that the Docker daemon is in a container. // - Creates the system container where all non-containerized processes run. - Start(context.Context, *v1.Node, ActivePodsFunc, config.SourcesReady, status.PodStatusProvider, internalapi.RuntimeService, bool) error + Start(context.Context, *v1.Node, ActivePodsFunc, GetNodeFunc, config.SourcesReady, status.PodStatusProvider, internalapi.RuntimeService, bool) error // SystemCgroupsLimit returns resources allocated to system cgroups in the machine. // These cgroups include the system and Kubernetes services. @@ -115,10 +117,10 @@ type ContainerManager interface { // GetPodCgroupRoot returns the cgroup which contains all pods. GetPodCgroupRoot() string - // GetPluginRegistrationHandler returns a plugin registration handler + // GetPluginRegistrationHandlers returns a set of plugin registration handlers // The pluginwatcher's Handlers allow to have a single module for handling // registration. - GetPluginRegistrationHandler() cache.PluginHandler + GetPluginRegistrationHandlers() map[string]cache.PluginHandler // ShouldResetExtendedResourceCapacity returns whether or not the extended resources should be zeroed, // due to node recreation. diff --git a/pkg/kubelet/cm/container_manager_linux.go b/pkg/kubelet/cm/container_manager_linux.go index 02f56d82418..406a4f259cc 100644 --- a/pkg/kubelet/cm/container_manager_linux.go +++ b/pkg/kubelet/cm/container_manager_linux.go @@ -46,6 +46,7 @@ import ( "k8s.io/client-go/tools/record" utilsysctl "k8s.io/component-helpers/node/util/sysctl" internalapi "k8s.io/cri-api/pkg/apis" + pluginwatcherapi "k8s.io/kubelet/pkg/apis/pluginregistration/v1" podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1" kubefeatures "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/kubelet/cadvisor" @@ -132,6 +133,8 @@ type containerManagerImpl struct { topologyManager topologymanager.Manager // Interface for Dynamic Resource Allocation management. draManager dra.Manager + // kubeClient is the interface to the Kubernetes API server. May be nil if the kubelet is running in standalone mode. + kubeClient clientset.Interface } type features struct { @@ -312,6 +315,7 @@ func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.I return nil, err } } + cm.kubeClient = kubeClient // Initialize CPU manager cm.cpuManager, err = cpumanager.NewManager( @@ -555,6 +559,7 @@ func (cm *containerManagerImpl) Status() Status { func (cm *containerManagerImpl) Start(ctx context.Context, node *v1.Node, activePods ActivePodsFunc, + getNode GetNodeFunc, sourcesReady config.SourcesReady, podStatusProvider status.PodStatusProvider, runtimeService internalapi.RuntimeService, @@ -564,7 +569,7 @@ func (cm *containerManagerImpl) Start(ctx context.Context, node *v1.Node, // Initialize DRA manager if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.DynamicResourceAllocation) { - err := cm.draManager.Start(ctx, dra.ActivePodsFunc(activePods), sourcesReady) + err := cm.draManager.Start(ctx, dra.ActivePodsFunc(activePods), dra.GetNodeFunc(getNode), sourcesReady) if err != nil { return fmt.Errorf("start dra manager error: %w", err) } @@ -649,8 +654,16 @@ func (cm *containerManagerImpl) Start(ctx context.Context, node *v1.Node, return nil } -func (cm *containerManagerImpl) GetPluginRegistrationHandler() cache.PluginHandler { - return cm.deviceManager.GetWatcherHandler() +func (cm *containerManagerImpl) GetPluginRegistrationHandlers() map[string]cache.PluginHandler { + res := map[string]cache.PluginHandler{ + pluginwatcherapi.DevicePlugin: cm.deviceManager.GetWatcherHandler(), + } + + if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.DynamicResourceAllocation) { + res[pluginwatcherapi.DRAPlugin] = cm.draManager.GetWatcherHandler() + } + + return res } // TODO: move the GetResources logic to PodContainerManager. diff --git a/pkg/kubelet/cm/container_manager_stub.go b/pkg/kubelet/cm/container_manager_stub.go index 35aeee36724..85d51934b4c 100644 --- a/pkg/kubelet/cm/container_manager_stub.go +++ b/pkg/kubelet/cm/container_manager_stub.go @@ -46,7 +46,7 @@ type containerManagerStub struct { var _ ContainerManager = &containerManagerStub{} -func (cm *containerManagerStub) Start(_ context.Context, _ *v1.Node, _ ActivePodsFunc, _ config.SourcesReady, _ status.PodStatusProvider, _ internalapi.RuntimeService, _ bool) error { +func (cm *containerManagerStub) Start(_ context.Context, _ *v1.Node, _ ActivePodsFunc, _ GetNodeFunc, _ config.SourcesReady, _ status.PodStatusProvider, _ internalapi.RuntimeService, _ bool) error { klog.V(2).InfoS("Starting stub container manager") return nil } @@ -91,7 +91,7 @@ func (cm *containerManagerStub) GetCapacity(localStorageCapacityIsolation bool) return c } -func (cm *containerManagerStub) GetPluginRegistrationHandler() cache.PluginHandler { +func (cm *containerManagerStub) GetPluginRegistrationHandlers() map[string]cache.PluginHandler { return nil } diff --git a/pkg/kubelet/cm/container_manager_unsupported.go b/pkg/kubelet/cm/container_manager_unsupported.go index a384228d31a..ea30b75d586 100644 --- a/pkg/kubelet/cm/container_manager_unsupported.go +++ b/pkg/kubelet/cm/container_manager_unsupported.go @@ -40,7 +40,7 @@ type unsupportedContainerManager struct { var _ ContainerManager = &unsupportedContainerManager{} -func (unsupportedContainerManager) Start(_ context.Context, _ *v1.Node, _ ActivePodsFunc, _ config.SourcesReady, _ status.PodStatusProvider, _ internalapi.RuntimeService, _ bool) error { +func (unsupportedContainerManager) Start(_ context.Context, _ *v1.Node, _ ActivePodsFunc, _ GetNodeFunc, _ config.SourcesReady, _ status.PodStatusProvider, _ internalapi.RuntimeService, _ bool) error { return fmt.Errorf("Container Manager is unsupported in this build") } diff --git a/pkg/kubelet/cm/container_manager_windows.go b/pkg/kubelet/cm/container_manager_windows.go index 232d121223a..8f95a57beab 100644 --- a/pkg/kubelet/cm/container_manager_windows.go +++ b/pkg/kubelet/cm/container_manager_windows.go @@ -35,6 +35,7 @@ import ( clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/record" internalapi "k8s.io/cri-api/pkg/apis" + pluginwatcherapi "k8s.io/kubelet/pkg/apis/pluginregistration/v1" podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1" "k8s.io/kubernetes/pkg/kubelet/cadvisor" "k8s.io/kubernetes/pkg/kubelet/cm/admission" @@ -72,6 +73,7 @@ func (ra *noopWindowsResourceAllocator) Admit(attrs *lifecycle.PodAdmitAttribute func (cm *containerManagerImpl) Start(ctx context.Context, node *v1.Node, activePods ActivePodsFunc, + getNode GetNodeFunc, sourcesReady config.SourcesReady, podStatusProvider status.PodStatusProvider, runtimeService internalapi.RuntimeService, @@ -176,8 +178,9 @@ func (cm *containerManagerImpl) GetCapacity(localStorageCapacityIsolation bool) return cm.capacity } -func (cm *containerManagerImpl) GetPluginRegistrationHandler() cache.PluginHandler { - return cm.deviceManager.GetWatcherHandler() +func (cm *containerManagerImpl) GetPluginRegistrationHandlers() map[string]cache.PluginHandler { + // DRA is not supported on Windows, only device plugin is supported + return map[string]cache.PluginHandler{pluginwatcherapi.DevicePlugin: cm.deviceManager.GetWatcherHandler()} } func (cm *containerManagerImpl) GetDevicePluginResourceCapacity() (v1.ResourceList, v1.ResourceList, []string) { diff --git a/pkg/kubelet/cm/devicemanager/types.go b/pkg/kubelet/cm/devicemanager/types.go index 561b7e4e7ff..097dea4ca2d 100644 --- a/pkg/kubelet/cm/devicemanager/types.go +++ b/pkg/kubelet/cm/devicemanager/types.go @@ -59,6 +59,8 @@ type Manager interface { // GetCapacity returns the amount of available device plugin resource capacity, resource allocatable // and inactive device plugin resources previously registered on the node. GetCapacity() (v1.ResourceList, v1.ResourceList, []string) + + // GetWatcherHandler returns the plugin handler for the device manager. GetWatcherHandler() cache.PluginHandler // GetDevices returns information about the devices assigned to pods and containers diff --git a/pkg/kubelet/cm/dra/manager.go b/pkg/kubelet/cm/dra/manager.go index a5534536b6b..50a8dd22997 100644 --- a/pkg/kubelet/cm/dra/manager.go +++ b/pkg/kubelet/cm/dra/manager.go @@ -37,6 +37,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/config" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/metrics" + "k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache" ) // draManagerStateFileName is the file name where dra manager stores its state @@ -48,6 +49,9 @@ const defaultReconcilePeriod = 60 * time.Second // ActivePodsFunc is a function that returns a list of pods to reconcile. type ActivePodsFunc func() []*v1.Pod +// GetNodeFunc is a function that returns the node object using the kubelet's node lister. +type GetNodeFunc func() (*v1.Node, error) + // ManagerImpl is the structure in charge of managing DRA drivers. type ManagerImpl struct { // cache contains cached claim info @@ -66,6 +70,9 @@ type ManagerImpl struct { // KubeClient reference kubeClient clientset.Interface + + // getNode is a function that returns the node object using the kubelet's node lister. + getNode GetNodeFunc } // NewManagerImpl creates a new manager. @@ -90,9 +97,14 @@ func NewManagerImpl(kubeClient clientset.Interface, stateFileDirectory string, n return manager, nil } +func (m *ManagerImpl) GetWatcherHandler() cache.PluginHandler { + return cache.PluginHandler(dra.NewRegistrationHandler(m.kubeClient, m.getNode)) +} + // Start starts the reconcile loop of the manager. -func (m *ManagerImpl) Start(ctx context.Context, activePods ActivePodsFunc, sourcesReady config.SourcesReady) error { +func (m *ManagerImpl) Start(ctx context.Context, activePods ActivePodsFunc, getNode GetNodeFunc, sourcesReady config.SourcesReady) error { m.activePods = activePods + m.getNode = getNode m.sourcesReady = sourcesReady go wait.UntilWithContext(ctx, func(ctx context.Context) { m.reconcileLoop(ctx) }, m.reconcilePeriod) return nil diff --git a/pkg/kubelet/cm/dra/types.go b/pkg/kubelet/cm/dra/types.go index 266d55f3e62..eafd4914e16 100644 --- a/pkg/kubelet/cm/dra/types.go +++ b/pkg/kubelet/cm/dra/types.go @@ -23,13 +23,17 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/kubernetes/pkg/kubelet/config" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" + "k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache" ) // Manager manages all the DRA resource plugins running on a node. type Manager interface { + // GetWatcherHandler returns the plugin handler for the DRA. + GetWatcherHandler() cache.PluginHandler + // Start starts the reconcile loop of the manager. // This will ensure that all claims are unprepared even if pods get deleted unexpectedly. - Start(ctx context.Context, activePods ActivePodsFunc, sourcesReady config.SourcesReady) error + Start(ctx context.Context, activePods ActivePodsFunc, getNode GetNodeFunc, sourcesReady config.SourcesReady) error // PrepareResources prepares resources for a pod. // It communicates with the DRA resource plugin to prepare resources. diff --git a/pkg/kubelet/cm/fake_container_manager.go b/pkg/kubelet/cm/fake_container_manager.go index f3f6c6eb509..6c575f6b7ff 100644 --- a/pkg/kubelet/cm/fake_container_manager.go +++ b/pkg/kubelet/cm/fake_container_manager.go @@ -53,7 +53,7 @@ func NewFakeContainerManager() *FakeContainerManager { } } -func (cm *FakeContainerManager) Start(_ context.Context, _ *v1.Node, _ ActivePodsFunc, _ config.SourcesReady, _ status.PodStatusProvider, _ internalapi.RuntimeService, _ bool) error { +func (cm *FakeContainerManager) Start(_ context.Context, _ *v1.Node, _ ActivePodsFunc, _ GetNodeFunc, _ config.SourcesReady, _ status.PodStatusProvider, _ internalapi.RuntimeService, _ bool) error { cm.Lock() defer cm.Unlock() cm.CalledFunctions = append(cm.CalledFunctions, "Start") @@ -124,10 +124,10 @@ func (cm *FakeContainerManager) GetCapacity(localStorageCapacityIsolation bool) return c } -func (cm *FakeContainerManager) GetPluginRegistrationHandler() cache.PluginHandler { +func (cm *FakeContainerManager) GetPluginRegistrationHandlers() map[string]cache.PluginHandler { cm.Lock() defer cm.Unlock() - cm.CalledFunctions = append(cm.CalledFunctions, "GetPluginRegistrationHandler") + cm.CalledFunctions = append(cm.CalledFunctions, "GetPluginRegistrationHandlers") return nil } diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 8c10db99cf7..8da0e4fb691 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -81,7 +81,6 @@ import ( "k8s.io/kubernetes/pkg/kubelet/cloudresource" "k8s.io/kubernetes/pkg/kubelet/clustertrustbundle" "k8s.io/kubernetes/pkg/kubelet/cm" - draplugin "k8s.io/kubernetes/pkg/kubelet/cm/dra/plugin" "k8s.io/kubernetes/pkg/kubelet/config" "k8s.io/kubernetes/pkg/kubelet/configmap" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" @@ -1575,7 +1574,7 @@ func (kl *Kubelet) initializeRuntimeDependentModules() { os.Exit(1) } // containerManager must start after cAdvisor because it needs filesystem capacity information - if err := kl.containerManager.Start(context.TODO(), node, kl.GetActivePods, kl.sourcesReady, kl.statusManager, kl.runtimeService, kl.supportLocalStorageCapacityIsolation()); err != nil { + if err := kl.containerManager.Start(context.TODO(), node, kl.GetActivePods, kl.getNodeAnyWay, kl.sourcesReady, kl.statusManager, kl.runtimeService, kl.supportLocalStorageCapacityIsolation()); err != nil { // Fail kubelet and rely on the babysitter to retry starting kubelet. klog.ErrorS(err, "Failed to start ContainerManager") os.Exit(1) @@ -1589,12 +1588,10 @@ func (kl *Kubelet) initializeRuntimeDependentModules() { kl.containerLogManager.Start() // Adding Registration Callback function for CSI Driver kl.pluginManager.AddHandler(pluginwatcherapi.CSIPlugin, plugincache.PluginHandler(csi.PluginHandler)) - // Adding Registration Callback function for DRA Plugin - if utilfeature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation) { - kl.pluginManager.AddHandler(pluginwatcherapi.DRAPlugin, plugincache.PluginHandler(draplugin.NewRegistrationHandler(kl.kubeClient, kl.getNodeAnyWay))) + // Adding Registration Callback function for DRA Plugin and Device Plugin + for name, handler := range kl.containerManager.GetPluginRegistrationHandlers() { + kl.pluginManager.AddHandler(name, handler) } - // Adding Registration Callback function for Device Manager - kl.pluginManager.AddHandler(pluginwatcherapi.DevicePlugin, kl.containerManager.GetPluginRegistrationHandler()) // Start the plugin manager klog.V(4).InfoS("Starting plugin manager")