Merge pull request #112980 from swatisehgal/devicemanager-ga-graduation

node: devicemgr: Graduate Kubelet DeviceManager to GA
This commit is contained in:
Kubernetes Prow Robot 2022-11-02 13:17:01 -07:00 committed by GitHub
commit 25dc4c4f32
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 58 additions and 156 deletions

View File

@ -256,14 +256,10 @@ if [[ (( "${KUBE_FEATURE_GATES:-}" == *"AllAlpha=true"* ) || ( "${KUBE_FEATURE_G
fi
# Optional: set feature gates
# shellcheck disable=SC2034 # Variables sourced in other scripts.
FEATURE_GATES="${KUBE_FEATURE_GATES:-}"
if [[ -n "${NODE_ACCELERATORS}" ]]; then
if [[ -z "${FEATURE_GATES:-}" ]]; then
FEATURE_GATES="DevicePlugins=true"
else
FEATURE_GATES="${FEATURE_GATES},DevicePlugins=true"
fi
if [[ "${NODE_ACCELERATORS}" =~ .*type=([a-zA-Z0-9-]+).* ]]; then
NON_MASTER_NODE_LABELS="${NON_MASTER_NODE_LABELS},cloud.google.com/gke-accelerator=${BASH_REMATCH[1]}"
fi

View File

@ -157,6 +157,7 @@ if [[ (( "${KUBE_FEATURE_GATES:-}" = *"AllAlpha=true"* ) || ( "${KUBE_FEATURE_GA
fi
# Optional: set feature gates
# shellcheck disable=SC2034 # Variables sourced in other scripts.
FEATURE_GATES=${KUBE_FEATURE_GATES:-}
#Optional: disable the cloud provider no schedule taint for testing.
@ -305,11 +306,6 @@ if [[ ${KUBE_ENABLE_INSECURE_REGISTRY:-false} = 'true' ]]; then
fi
if [[ -n "${NODE_ACCELERATORS}" ]]; then
if [[ -z "${FEATURE_GATES:-}" ]]; then
FEATURE_GATES='DevicePlugins=true'
else
FEATURE_GATES="${FEATURE_GATES},DevicePlugins=true"
fi
if [[ "${NODE_ACCELERATORS}" =~ .*type=([a-zA-Z0-9-]+).* ]]; then
NON_MASTER_NODE_LABELS="${NON_MASTER_NODE_LABELS},cloud.google.com/gke-accelerator=${BASH_REMATCH[1]}"
fi

View File

@ -704,8 +704,6 @@ func run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Depend
return err
}
devicePluginEnabled := utilfeature.DefaultFeatureGate.Enabled(features.DevicePlugins)
var cpuManagerPolicyOptions map[string]string
if utilfeature.DefaultFeatureGate.Enabled(features.CPUManager) {
if utilfeature.DefaultFeatureGate.Enabled(features.CPUManagerPolicyOptions) {
@ -751,7 +749,6 @@ func run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Depend
ExperimentalTopologyManagerScope: s.TopologyManagerScope,
},
s.FailSwapOn,
devicePluginEnabled,
kubeDeps.Recorder)
if err != nil {

View File

@ -229,8 +229,10 @@ const (
// NodePublishVolume calls.
DelegateFSGroupToCSIDriver featuregate.Feature = "DelegateFSGroupToCSIDriver"
// owner: @jiayingz
// owner: @jiayingz, @swatisehgal (for GA graduation)
// alpha: v1.8
// beta: v1.10
// GA: v1.26
//
// Enables support for Device Plugins
DevicePlugins featuregate.Feature = "DevicePlugins"
@ -875,7 +877,7 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS
DelegateFSGroupToCSIDriver: {Default: true, PreRelease: featuregate.Beta},
DevicePlugins: {Default: true, PreRelease: featuregate.Beta},
DevicePlugins: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // GA in 1.26
DisableAcceleratorUsageMetrics: {Default: true, PreRelease: featuregate.GA, LockToDefault: true},

View File

@ -194,7 +194,7 @@ func validateSystemRequirements(mountUtil mount.Interface) (features, error) {
// TODO(vmarmol): Add limits to the system containers.
// Takes the absolute name of the specified containers.
// Empty container name disables use of the specified container.
func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.Interface, nodeConfig NodeConfig, failSwapOn bool, devicePluginEnabled bool, recorder record.EventRecorder) (ContainerManager, error) {
func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.Interface, nodeConfig NodeConfig, failSwapOn bool, recorder record.EventRecorder) (ContainerManager, error) {
subsystems, err := GetCgroupSubsystems()
if err != nil {
return nil, fmt.Errorf("failed to get mounted cgroup subsystems: %v", err)
@ -298,16 +298,12 @@ func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.I
cm.topologyManager = topologymanager.NewFakeManager()
}
klog.InfoS("Creating device plugin manager", "devicePluginEnabled", devicePluginEnabled)
if devicePluginEnabled {
cm.deviceManager, err = devicemanager.NewManagerImpl(machineInfo.Topology, cm.topologyManager)
cm.topologyManager.AddHintProvider(cm.deviceManager)
} else {
cm.deviceManager, err = devicemanager.NewManagerStub()
}
klog.InfoS("Creating device plugin manager")
cm.deviceManager, err = devicemanager.NewManagerImpl(machineInfo.Topology, cm.topologyManager)
if err != nil {
return nil, err
}
cm.topologyManager.AddHintProvider(cm.deviceManager)
// Initialize CPU manager
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.CPUManager) {

View File

@ -42,6 +42,6 @@ func (unsupportedContainerManager) Start(_ *v1.Node, _ ActivePodsFunc, _ config.
return fmt.Errorf("Container Manager is unsupported in this build")
}
func NewContainerManager(_ mount.Interface, _ cadvisor.Interface, _ NodeConfig, failSwapOn bool, devicePluginEnabled bool, recorder record.EventRecorder) (ContainerManager, error) {
func NewContainerManager(_ mount.Interface, _ cadvisor.Interface, _ NodeConfig, failSwapOn bool, recorder record.EventRecorder) (ContainerManager, error) {
return &unsupportedContainerManager{}, nil
}

View File

@ -93,7 +93,7 @@ func (cm *containerManagerImpl) Start(node *v1.Node,
}
// NewContainerManager creates windows container manager.
func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.Interface, nodeConfig NodeConfig, failSwapOn bool, devicePluginEnabled bool, recorder record.EventRecorder) (ContainerManager, error) {
func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.Interface, nodeConfig NodeConfig, failSwapOn bool, recorder record.EventRecorder) (ContainerManager, error) {
// It is safe to invoke `MachineInfo` on cAdvisor before logically initializing cAdvisor here because
// machine info is computed and cached once as part of cAdvisor object creation.
// But `RootFsInfo` and `ImagesFsInfo` are not available at this moment so they will be called later during manager starts
@ -111,16 +111,12 @@ func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.I
cm.topologyManager = topologymanager.NewFakeManager()
klog.InfoS("Creating device plugin manager", "devicePluginEnabled", devicePluginEnabled)
if devicePluginEnabled {
cm.deviceManager, err = devicemanager.NewManagerImpl(nil, cm.topologyManager)
cm.topologyManager.AddHintProvider(cm.deviceManager)
} else {
cm.deviceManager, err = devicemanager.NewManagerStub()
}
klog.InfoS("Creating device plugin manager")
cm.deviceManager, err = devicemanager.NewManagerImpl(nil, cm.topologyManager)
if err != nil {
return nil, err
}
cm.topologyManager.AddHintProvider(cm.deviceManager)
return cm, nil
}

View File

@ -74,10 +74,10 @@ func (dev DevicesPerNUMA) Devices() sets.String {
// New returns an instance of Checkpoint - must be an alias for the most recent version
func New(devEntries []PodDevicesEntry, devices map[string][]string) DeviceManagerCheckpoint {
return NewV2(devEntries, devices)
return newV2(devEntries, devices)
}
func NewV2(devEntries []PodDevicesEntry, devices map[string][]string) DeviceManagerCheckpoint {
func newV2(devEntries []PodDevicesEntry, devices map[string][]string) DeviceManagerCheckpoint {
return &Data{
Data: checkpointData{
PodDeviceEntries: devEntries,

View File

@ -28,7 +28,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors"
)
// PodDevicesEntry connects pod information to devices, without topology information (k8s <= 1.19)
// PodDevicesEntryV1 connects pod information to devices, without topology information (k8s <= 1.19)
type PodDevicesEntryV1 struct {
PodUID string
ContainerName string
@ -37,7 +37,7 @@ type PodDevicesEntryV1 struct {
AllocResp []byte
}
// checkpointData struct is used to store pod to device allocation information
// checkpointDataV1 struct is used to store pod to device allocation information
// in a checkpoint file, without topology information (k8s <= 1.19)
type checkpointDataV1 struct {
PodDeviceEntries []PodDevicesEntryV1
@ -63,13 +63,13 @@ func (cp checkpointDataV1) checksum() checksum.Checksum {
return checksum.Checksum(hash.Sum32())
}
// Data holds checkpoint data and its checksum, in V1 (k8s <= 1.19) format
// DataV1 holds checkpoint data and its checksum, in V1 (k8s <= 1.19) format
type DataV1 struct {
Data checkpointDataV1
Checksum checksum.Checksum
}
// New returns an instance of Checkpoint, in V1 (k8s <= 1.19) format.
// NewV1 returns an instance of Checkpoint, in V1 (k8s <= 1.19) format.
// Users should avoid creating checkpoints in formats different than the most recent one,
// use the old formats only to validate existing checkpoint and convert them to most recent
// format. The only exception should be test code.
@ -90,7 +90,7 @@ func (cp *DataV1) MarshalCheckpoint() ([]byte, error) {
return json.Marshal(*cp)
}
// MarshalCheckpoint returns marshalled data
// UnmarshalCheckpoint returns unmarshalled data
func (cp *DataV1) UnmarshalCheckpoint(blob []byte) error {
return json.Unmarshal(blob, cp)
}

View File

@ -50,7 +50,7 @@ type endpointImpl struct {
// This is to be used during normal device plugin registration.
func newEndpointImpl(p plugin.DevicePlugin) *endpointImpl {
return &endpointImpl{
api: p.Api(),
api: p.API(),
resourceName: p.Resource(),
}
}

View File

@ -33,9 +33,7 @@ import (
"k8s.io/apimachinery/pkg/api/resource"
errorsutil "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/sets"
utilfeature "k8s.io/apiserver/pkg/util/feature"
pluginapi "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors"
"k8s.io/kubernetes/pkg/kubelet/cm/devicemanager/checkpoint"
@ -165,6 +163,8 @@ func newManagerImpl(socketPath string, topology []cadvisorapi.Node, topologyAffi
return manager, nil
}
// CleanupPluginDirectory is to remove all existing unix sockets
// from /var/lib/kubelet/device-plugins on Device Plugin Manager start
func (m *ManagerImpl) CleanupPluginDirectory(dir string) error {
d, err := os.Open(dir)
if err != nil {
@ -202,8 +202,10 @@ func (m *ManagerImpl) CleanupPluginDirectory(dir string) error {
return errorsutil.NewAggregate(errs)
}
// PluginConnected is to connect a plugin to a new endpoint.
// This is done as part of device plugin registration.
func (m *ManagerImpl) PluginConnected(resourceName string, p plugin.DevicePlugin) error {
options, err := p.Api().GetDevicePluginOptions(context.Background(), &pluginapi.Empty{})
options, err := p.API().GetDevicePluginOptions(context.Background(), &pluginapi.Empty{})
if err != nil {
return fmt.Errorf("failed to get device plugin options: %v", err)
}
@ -217,6 +219,8 @@ func (m *ManagerImpl) PluginConnected(resourceName string, p plugin.DevicePlugin
return nil
}
// PluginDisconnected is to disconnect a plugin from an endpoint.
// This is done as part of device plugin deregistration.
func (m *ManagerImpl) PluginDisconnected(resourceName string) {
m.mutex.Lock()
defer m.mutex.Unlock()
@ -229,6 +233,10 @@ func (m *ManagerImpl) PluginDisconnected(resourceName string) {
m.endpoints[resourceName].e.setStopTime(time.Now())
}
// PluginListAndWatchReceiver receives ListAndWatchResponse from a device plugin
// and ensures that an upto date state (e.g. number of devices and device health)
// is captured. Also, registered device and device to container allocation
// information is checkpointed to the disk.
func (m *ManagerImpl) PluginListAndWatchReceiver(resourceName string, resp *pluginapi.ListAndWatchResponse) {
var devices []pluginapi.Device
for _, d := range resp.Devices {
@ -1005,14 +1013,11 @@ func (m *ManagerImpl) GetDevices(podUID, containerName string) ResourceDeviceIns
// depending on whether the node has been recreated. Absence of the checkpoint file strongly indicates the node
// has been recreated.
func (m *ManagerImpl) ShouldResetExtendedResourceCapacity() bool {
if utilfeature.DefaultFeatureGate.Enabled(features.DevicePlugins) {
checkpoints, err := m.checkpointManager.ListCheckpoints()
if err != nil {
return false
}
return len(checkpoints) == 0
checkpoints, err := m.checkpointManager.ListCheckpoints()
if err != nil {
return false
}
return false
return len(checkpoints) == 0
}
func (m *ManagerImpl) setPodPendingAdmission(pod *v1.Pod) {

View File

@ -1,99 +0,0 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package devicemanager
import (
v1 "k8s.io/api/core/v1"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
"k8s.io/kubernetes/pkg/kubelet/config"
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
"k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache"
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
)
// ManagerStub provides a simple stub implementation for the Device Manager.
type ManagerStub struct{}
// NewManagerStub creates a ManagerStub.
func NewManagerStub() (*ManagerStub, error) {
return &ManagerStub{}, nil
}
// Start simply returns nil.
func (h *ManagerStub) Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady) error {
return nil
}
// Stop simply returns nil.
func (h *ManagerStub) Stop() error {
return nil
}
// Allocate simply returns nil.
func (h *ManagerStub) Allocate(pod *v1.Pod, container *v1.Container) error {
return nil
}
// UpdatePluginResources simply returns nil.
func (h *ManagerStub) UpdatePluginResources(node *schedulerframework.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error {
return nil
}
// GetDeviceRunContainerOptions simply returns nil.
func (h *ManagerStub) GetDeviceRunContainerOptions(pod *v1.Pod, container *v1.Container) (*DeviceRunContainerOptions, error) {
return nil, nil
}
// GetCapacity simply returns nil capacity and empty removed resource list.
func (h *ManagerStub) GetCapacity() (v1.ResourceList, v1.ResourceList, []string) {
return nil, nil, []string{}
}
// GetWatcherHandler returns plugin watcher interface
func (h *ManagerStub) GetWatcherHandler() cache.PluginHandler {
return nil
}
// GetTopologyHints returns an empty TopologyHint map
func (h *ManagerStub) GetTopologyHints(pod *v1.Pod, container *v1.Container) map[string][]topologymanager.TopologyHint {
return map[string][]topologymanager.TopologyHint{}
}
// GetPodTopologyHints returns an empty TopologyHint map
func (h *ManagerStub) GetPodTopologyHints(pod *v1.Pod) map[string][]topologymanager.TopologyHint {
return map[string][]topologymanager.TopologyHint{}
}
// GetDevices returns nil
func (h *ManagerStub) GetDevices(_, _ string) ResourceDeviceInstances {
return nil
}
// GetAllocatableDevices returns nothing
func (h *ManagerStub) GetAllocatableDevices() ResourceDeviceInstances {
return nil
}
// ShouldResetExtendedResourceCapacity returns false
func (h *ManagerStub) ShouldResetExtendedResourceCapacity() bool {
return false
}
// UpdateAllocatedDevices returns nothing
func (h *ManagerStub) UpdateAllocatedDevices() {
return
}

View File

@ -20,10 +20,13 @@ import (
api "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1"
)
// RegistrationHandler is an interface for handling device plugin registration
// and plugin directory cleanup.
type RegistrationHandler interface {
CleanupPluginDirectory(string) error
}
// ClientHandler is an interface for handling device plugin connections.
type ClientHandler interface {
PluginConnected(string, DevicePlugin) error
PluginDisconnected(string)

View File

@ -30,12 +30,14 @@ import (
api "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1"
)
// DevicePlugin interface provides methods for accessing Device Plugin resources, API and unix socket.
type DevicePlugin interface {
Api() api.DevicePluginClient
API() api.DevicePluginClient
Resource() string
SocketPath() string
}
// Client interface provides methods for establishing/closing gRPC connection and running the device plugin gRPC client.
type Client interface {
Connect() error
Run()
@ -51,6 +53,7 @@ type client struct {
client api.DevicePluginClient
}
// NewPluginClient returns an initialized device plugin client.
func NewPluginClient(r string, socketPath string, h ClientHandler) Client {
return &client{
resource: r,
@ -59,6 +62,7 @@ func NewPluginClient(r string, socketPath string, h ClientHandler) Client {
}
}
// Connect is for establishing a gRPC connection between device manager and device plugin.
func (c *client) Connect() error {
client, conn, err := dial(c.socket)
if err != nil {
@ -70,6 +74,7 @@ func (c *client) Connect() error {
return c.handler.PluginConnected(c.resource, c)
}
// Run is for running the device plugin gRPC client.
func (c *client) Run() {
stream, err := c.client.ListAndWatch(context.Background(), &api.Empty{})
if err != nil {
@ -88,6 +93,7 @@ func (c *client) Run() {
}
}
// Disconnect is for closing gRPC connection between device manager and device plugin.
func (c *client) Disconnect() error {
c.mutex.Lock()
if c.grpc != nil {
@ -105,7 +111,7 @@ func (c *client) Resource() string {
return c.resource
}
func (c *client) Api() api.DevicePluginClient {
func (c *client) API() api.DevicePluginClient {
return c.client
}

View File

@ -36,6 +36,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache"
)
// Server interface provides methods for Device plugin registration server.
type Server interface {
cache.PluginHandler
Start() error
@ -54,6 +55,7 @@ type server struct {
clients map[string]Client
}
// NewServer returns an initialized device plugin registration server.
func NewServer(socketPath string, rh RegistrationHandler, ch ClientHandler) (Server, error) {
if socketPath == "" || !filepath.IsAbs(socketPath) {
return nil, fmt.Errorf(errBadSocket+" %s", socketPath)

View File

@ -343,11 +343,11 @@ func (pdev *podDevices) getContainerDevices(podUID, contName string) ResourceDev
}
devicePluginMap := make(map[string]pluginapi.Device)
for numaid, devlist := range allocateInfo.deviceIds {
for _, devId := range devlist {
for _, devID := range devlist {
var topology *pluginapi.TopologyInfo
if numaid != nodeWithoutTopology {
NUMANodes := []*pluginapi.NUMANode{{ID: numaid}}
if pDev, ok := devicePluginMap[devId]; ok && pDev.Topology != nil {
if pDev, ok := devicePluginMap[devID]; ok && pDev.Topology != nil {
if nodes := pDev.Topology.GetNodes(); nodes != nil {
NUMANodes = append(NUMANodes, nodes...)
}
@ -356,7 +356,7 @@ func (pdev *podDevices) getContainerDevices(podUID, contName string) ResourceDev
// ID and Healthy are not relevant here.
topology = &pluginapi.TopologyInfo{Nodes: NUMANodes}
}
devicePluginMap[devId] = pluginapi.Device{
devicePluginMap[devID] = pluginapi.Device{
Topology: topology,
}
}
@ -372,10 +372,12 @@ type DeviceInstances map[string]pluginapi.Device
// ResourceDeviceInstances is a mapping resource name -> DeviceInstances
type ResourceDeviceInstances map[string]DeviceInstances
// NewResourceDeviceInstances returns a new ResourceDeviceInstances
func NewResourceDeviceInstances() ResourceDeviceInstances {
return make(ResourceDeviceInstances)
}
// Clone returns a clone of ResourceDeviceInstances
func (rdev ResourceDeviceInstances) Clone() ResourceDeviceInstances {
clone := NewResourceDeviceInstances()
for resourceName, resourceDevs := range rdev {

View File

@ -42,14 +42,14 @@ func TestGetContainerDevices(t *testing.T) {
contDevices, ok := resContDevices[resourceName1]
require.True(t, ok, "resource %q not present", resourceName1)
for devId, plugInfo := range contDevices {
for devID, plugInfo := range contDevices {
nodes := plugInfo.GetTopology().GetNodes()
require.Equal(t, len(nodes), len(devices), "Incorrect container devices: %v - %v (nodes %v)", devices, contDevices, nodes)
for _, node := range plugInfo.GetTopology().GetNodes() {
dev, ok := devices[node.ID]
require.True(t, ok, "NUMA id %v doesn't exist in result", node.ID)
require.Equal(t, devId, dev[0], "Can't find device %s in result", dev[0])
require.Equal(t, devID, dev[0], "Can't find device %s in result", dev[0])
}
}
}