mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-24 12:15:52 +00:00
Merge pull request #58755 from vikaschoudhary16/probing-mode
Automatic merge from submit-queue (batch tested with PRs 58755, 66414). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>. Use probe based plugin watcher mechanism in Device Manager **What this PR does / why we need it**: Uses this probe based utility in the device plugin manager. **Which issue(s) this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close the issue(s) when PR gets merged)*: Fixes #56944 **Notes For Reviewers**: Changes are backward compatible and existing device plugins will continue to work. At the same time, any new plugins that has required support for probing model (Identity service implementation), will also work. **Release note** ```release-note Add support kubelet plugin watcher in device manager. ``` /sig node /area hw-accelerators /cc /cc @jiayingz @RenaudWasTaken @vishh @ScorpioCPH @sjenning @derekwaynecarr @jeremyeder @lichuqiang @tengqm @saad-ali @chakri-nelluri @ConnorDoyle
This commit is contained in:
commit
32e38b6659
@ -44,6 +44,7 @@ go_library(
|
||||
"//pkg/kubelet/apis/cri:go_default_library",
|
||||
"//pkg/kubelet/apis/cri/runtime/v1alpha2:go_default_library",
|
||||
"//pkg/kubelet/apis/kubeletconfig:go_default_library",
|
||||
"//pkg/kubelet/apis/pluginregistration/v1alpha1:go_default_library",
|
||||
"//pkg/kubelet/cadvisor:go_default_library",
|
||||
"//pkg/kubelet/certificate:go_default_library",
|
||||
"//pkg/kubelet/checkpointmanager:go_default_library",
|
||||
|
@ -32,6 +32,7 @@ go_library(
|
||||
"//pkg/kubelet/eviction/api:go_default_library",
|
||||
"//pkg/kubelet/lifecycle:go_default_library",
|
||||
"//pkg/kubelet/status:go_default_library",
|
||||
"//pkg/kubelet/util/pluginwatcher:go_default_library",
|
||||
"//pkg/scheduler/cache:go_default_library",
|
||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
|
||||
|
@ -28,6 +28,7 @@ import (
|
||||
evictionapi "k8s.io/kubernetes/pkg/kubelet/eviction/api"
|
||||
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
|
||||
"k8s.io/kubernetes/pkg/kubelet/status"
|
||||
"k8s.io/kubernetes/pkg/kubelet/util/pluginwatcher"
|
||||
schedulercache "k8s.io/kubernetes/pkg/scheduler/cache"
|
||||
|
||||
"fmt"
|
||||
@ -94,6 +95,7 @@ type ContainerManager interface {
|
||||
|
||||
// GetPodCgroupRoot returns the cgroup which contains all pods.
|
||||
GetPodCgroupRoot() string
|
||||
GetPluginRegistrationHandlerCallback() pluginwatcher.RegisterCallbackFn
|
||||
}
|
||||
|
||||
type NodeConfig struct {
|
||||
|
@ -52,6 +52,7 @@ import (
|
||||
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
|
||||
"k8s.io/kubernetes/pkg/kubelet/qos"
|
||||
"k8s.io/kubernetes/pkg/kubelet/status"
|
||||
"k8s.io/kubernetes/pkg/kubelet/util/pluginwatcher"
|
||||
schedulercache "k8s.io/kubernetes/pkg/scheduler/cache"
|
||||
utilfile "k8s.io/kubernetes/pkg/util/file"
|
||||
"k8s.io/kubernetes/pkg/util/mount"
|
||||
@ -600,6 +601,10 @@ func (cm *containerManagerImpl) Start(node *v1.Node,
|
||||
return nil
|
||||
}
|
||||
|
||||
func (cm *containerManagerImpl) GetPluginRegistrationHandlerCallback() pluginwatcher.RegisterCallbackFn {
|
||||
return cm.deviceManager.GetWatcherCallback()
|
||||
}
|
||||
|
||||
// TODO: move the GetResources logic to PodContainerManager.
|
||||
func (cm *containerManagerImpl) GetResources(pod *v1.Pod, container *v1.Container) (*kubecontainer.RunContainerOptions, error) {
|
||||
opts := &kubecontainer.RunContainerOptions{}
|
||||
|
@ -27,6 +27,7 @@ import (
|
||||
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
||||
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
|
||||
"k8s.io/kubernetes/pkg/kubelet/status"
|
||||
"k8s.io/kubernetes/pkg/kubelet/util/pluginwatcher"
|
||||
schedulercache "k8s.io/kubernetes/pkg/scheduler/cache"
|
||||
)
|
||||
|
||||
@ -76,6 +77,12 @@ func (cm *containerManagerStub) GetCapacity() v1.ResourceList {
|
||||
return c
|
||||
}
|
||||
|
||||
func (cm *containerManagerStub) GetPluginRegistrationHandlerCallback() pluginwatcher.RegisterCallbackFn {
|
||||
return func(name string, endpoint string, versions []string, sockPath string) (chan bool, error) {
|
||||
return nil, nil
|
||||
}
|
||||
}
|
||||
|
||||
func (cm *containerManagerStub) GetDevicePluginResourceCapacity() (v1.ResourceList, v1.ResourceList, []string) {
|
||||
return nil, nil, []string{}
|
||||
}
|
||||
|
@ -15,6 +15,7 @@ go_library(
|
||||
deps = [
|
||||
"//pkg/apis/core/v1/helper:go_default_library",
|
||||
"//pkg/kubelet/apis/deviceplugin/v1beta1:go_default_library",
|
||||
"//pkg/kubelet/apis/pluginregistration/v1alpha1:go_default_library",
|
||||
"//pkg/kubelet/checkpointmanager:go_default_library",
|
||||
"//pkg/kubelet/checkpointmanager/errors:go_default_library",
|
||||
"//pkg/kubelet/cm/devicemanager/checkpoint:go_default_library",
|
||||
@ -22,6 +23,7 @@ go_library(
|
||||
"//pkg/kubelet/container:go_default_library",
|
||||
"//pkg/kubelet/lifecycle:go_default_library",
|
||||
"//pkg/kubelet/metrics:go_default_library",
|
||||
"//pkg/kubelet/util/pluginwatcher:go_default_library",
|
||||
"//pkg/scheduler/cache:go_default_library",
|
||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
|
||||
@ -40,8 +42,10 @@ go_test(
|
||||
embed = [":go_default_library"],
|
||||
deps = [
|
||||
"//pkg/kubelet/apis/deviceplugin/v1beta1:go_default_library",
|
||||
"//pkg/kubelet/apis/pluginregistration/v1alpha1:go_default_library",
|
||||
"//pkg/kubelet/checkpointmanager:go_default_library",
|
||||
"//pkg/kubelet/lifecycle:go_default_library",
|
||||
"//pkg/kubelet/util/pluginwatcher:go_default_library",
|
||||
"//pkg/scheduler/cache:go_default_library",
|
||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
|
||||
|
@ -28,12 +28,15 @@ import (
|
||||
"google.golang.org/grpc"
|
||||
|
||||
pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1"
|
||||
watcherapi "k8s.io/kubernetes/pkg/kubelet/apis/pluginregistration/v1alpha1"
|
||||
)
|
||||
|
||||
// Stub implementation for DevicePlugin.
|
||||
type Stub struct {
|
||||
devs []*pluginapi.Device
|
||||
socket string
|
||||
devs []*pluginapi.Device
|
||||
socket string
|
||||
resourceName string
|
||||
preStartContainerFlag bool
|
||||
|
||||
stop chan interface{}
|
||||
wg sync.WaitGroup
|
||||
@ -43,6 +46,10 @@ type Stub struct {
|
||||
|
||||
// allocFunc is used for handling allocation request
|
||||
allocFunc stubAllocFunc
|
||||
|
||||
registrationStatus chan watcherapi.RegistrationStatus // for testing
|
||||
endpoint string // for testing
|
||||
|
||||
}
|
||||
|
||||
// stubAllocFunc is the function called when receive an allocation request from Kubelet
|
||||
@ -55,10 +62,12 @@ func defaultAllocFunc(r *pluginapi.AllocateRequest, devs map[string]pluginapi.De
|
||||
}
|
||||
|
||||
// NewDevicePluginStub returns an initialized DevicePlugin Stub.
|
||||
func NewDevicePluginStub(devs []*pluginapi.Device, socket string) *Stub {
|
||||
func NewDevicePluginStub(devs []*pluginapi.Device, socket string, name string, preStartContainerFlag bool) *Stub {
|
||||
return &Stub{
|
||||
devs: devs,
|
||||
socket: socket,
|
||||
devs: devs,
|
||||
socket: socket,
|
||||
resourceName: name,
|
||||
preStartContainerFlag: preStartContainerFlag,
|
||||
|
||||
stop: make(chan interface{}),
|
||||
update: make(chan []*pluginapi.Device),
|
||||
@ -88,6 +97,7 @@ func (m *Stub) Start() error {
|
||||
m.wg.Add(1)
|
||||
m.server = grpc.NewServer([]grpc.ServerOption{}...)
|
||||
pluginapi.RegisterDevicePluginServer(m.server, m)
|
||||
watcherapi.RegisterRegistrationServer(m.server, m)
|
||||
|
||||
go func() {
|
||||
defer m.wg.Done()
|
||||
@ -118,8 +128,36 @@ func (m *Stub) Stop() error {
|
||||
return m.cleanup()
|
||||
}
|
||||
|
||||
// GetInfo is the RPC which return pluginInfo
|
||||
func (m *Stub) GetInfo(ctx context.Context, req *watcherapi.InfoRequest) (*watcherapi.PluginInfo, error) {
|
||||
log.Println("GetInfo")
|
||||
return &watcherapi.PluginInfo{
|
||||
Type: watcherapi.DevicePlugin,
|
||||
Name: m.resourceName,
|
||||
Endpoint: m.endpoint,
|
||||
SupportedVersions: []string{pluginapi.Version}}, nil
|
||||
}
|
||||
|
||||
// NotifyRegistrationStatus receives the registration notification from watcher
|
||||
func (m *Stub) NotifyRegistrationStatus(ctx context.Context, status *watcherapi.RegistrationStatus) (*watcherapi.RegistrationStatusResponse, error) {
|
||||
if m.registrationStatus != nil {
|
||||
m.registrationStatus <- *status
|
||||
}
|
||||
if !status.PluginRegistered {
|
||||
log.Println("Registration failed: ", status.Error)
|
||||
}
|
||||
return &watcherapi.RegistrationStatusResponse{}, nil
|
||||
}
|
||||
|
||||
// Register registers the device plugin for the given resourceName with Kubelet.
|
||||
func (m *Stub) Register(kubeletEndpoint, resourceName string, preStartContainerFlag bool) error {
|
||||
func (m *Stub) Register(kubeletEndpoint, resourceName string, pluginSockDir string) error {
|
||||
if pluginSockDir != "" {
|
||||
if _, err := os.Stat(pluginSockDir + "DEPRECATION"); err == nil {
|
||||
log.Println("Deprecation file found. Skip registration.")
|
||||
return nil
|
||||
}
|
||||
}
|
||||
log.Println("Deprecation file not found. Invoke registration")
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
defer cancel()
|
||||
|
||||
@ -136,7 +174,7 @@ func (m *Stub) Register(kubeletEndpoint, resourceName string, preStartContainerF
|
||||
Version: pluginapi.Version,
|
||||
Endpoint: path.Base(m.socket),
|
||||
ResourceName: resourceName,
|
||||
Options: &pluginapi.DevicePluginOptions{PreStartRequired: preStartContainerFlag},
|
||||
Options: &pluginapi.DevicePluginOptions{PreStartRequired: m.preStartContainerFlag},
|
||||
}
|
||||
|
||||
_, err = client.Register(context.Background(), reqt)
|
||||
@ -148,7 +186,7 @@ func (m *Stub) Register(kubeletEndpoint, resourceName string, preStartContainerF
|
||||
|
||||
// GetDevicePluginOptions returns DevicePluginOptions settings for the device plugin.
|
||||
func (m *Stub) GetDevicePluginOptions(ctx context.Context, e *pluginapi.Empty) (*pluginapi.DevicePluginOptions, error) {
|
||||
return &pluginapi.DevicePluginOptions{}, nil
|
||||
return &pluginapi.DevicePluginOptions{PreStartRequired: m.preStartContainerFlag}, nil
|
||||
}
|
||||
|
||||
// PreStartContainer resets the devices received
|
||||
|
@ -180,7 +180,7 @@ func TestGetDevices(t *testing.T) {
|
||||
}
|
||||
|
||||
func esetup(t *testing.T, devs []*pluginapi.Device, socket, resourceName string, callback monitorCallback) (*Stub, *endpointImpl) {
|
||||
p := NewDevicePluginStub(devs, socket)
|
||||
p := NewDevicePluginStub(devs, socket, resourceName, false)
|
||||
|
||||
err := p.Start()
|
||||
require.NoError(t, err)
|
||||
|
@ -39,6 +39,7 @@ import (
|
||||
"k8s.io/kubernetes/pkg/kubelet/config"
|
||||
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
|
||||
"k8s.io/kubernetes/pkg/kubelet/metrics"
|
||||
watcher "k8s.io/kubernetes/pkg/kubelet/util/pluginwatcher"
|
||||
schedulercache "k8s.io/kubernetes/pkg/scheduler/cache"
|
||||
)
|
||||
|
||||
@ -239,6 +240,43 @@ func (m *ManagerImpl) Start(activePods ActivePodsFunc, sourcesReady config.Sourc
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetWatcherCallback returns callback function to be registered with plugin watcher
|
||||
func (m *ManagerImpl) GetWatcherCallback() watcher.RegisterCallbackFn {
|
||||
if f, err := os.Create(m.socketdir + "DEPRECATION"); err != nil {
|
||||
glog.Errorf("Failed to create deprecation file at %s", m.socketdir)
|
||||
} else {
|
||||
f.Close()
|
||||
glog.V(4).Infof("created deprecation file %s", f.Name())
|
||||
}
|
||||
|
||||
return func(name string, endpoint string, versions []string, sockPath string) (chan bool, error) {
|
||||
if !m.isVersionCompatibleWithPlugin(versions) {
|
||||
return nil, fmt.Errorf("manager version, %s, is not among plugin supported versions %v", pluginapi.Version, versions)
|
||||
}
|
||||
|
||||
if !v1helper.IsExtendedResourceName(v1.ResourceName(name)) {
|
||||
return nil, fmt.Errorf("invalid name of device plugin socket: %v", fmt.Sprintf(errInvalidResourceName, name))
|
||||
}
|
||||
|
||||
return m.addEndpointProbeMode(name, sockPath)
|
||||
}
|
||||
}
|
||||
|
||||
func (m *ManagerImpl) isVersionCompatibleWithPlugin(versions []string) bool {
|
||||
// TODO(vikasc): Currently this is fine as we only have a single supported version. When we do need to support
|
||||
// multiple versions in the future, we may need to extend this function to return a supported version.
|
||||
// E.g., say kubelet supports v1beta1 and v1beta2, and we get v1alpha1 and v1beta1 from a device plugin,
|
||||
// this function should return v1beta1
|
||||
for _, version := range versions {
|
||||
for _, supportedVersion := range pluginapi.SupportedVersions {
|
||||
if version == supportedVersion {
|
||||
return true
|
||||
}
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// Devices is the map of devices that are known by the Device
|
||||
// Plugin manager with the kind of the devices as key
|
||||
func (m *ManagerImpl) Devices() map[string][]pluginapi.Device {
|
||||
@ -335,10 +373,41 @@ func (m *ManagerImpl) Stop() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *ManagerImpl) addEndpoint(r *pluginapi.RegisterRequest) {
|
||||
existingDevs := make(map[string]pluginapi.Device)
|
||||
func (m *ManagerImpl) addEndpointProbeMode(resourceName string, socketPath string) (chan bool, error) {
|
||||
chanForAckOfNotification := make(chan bool)
|
||||
|
||||
new, err := newEndpointImpl(socketPath, resourceName, make(map[string]pluginapi.Device), m.callback)
|
||||
if err != nil {
|
||||
glog.Errorf("Failed to dial device plugin with socketPath %s: %v", socketPath, err)
|
||||
return nil, fmt.Errorf("Failed to dial device plugin with socketPath %s: %v", socketPath, err)
|
||||
}
|
||||
|
||||
options, err := new.client.GetDevicePluginOptions(context.Background(), &pluginapi.Empty{})
|
||||
if err != nil {
|
||||
glog.Errorf("Failed to get device plugin options: %v", err)
|
||||
return nil, fmt.Errorf("Failed to get device plugin options: %v", err)
|
||||
}
|
||||
m.registerEndpoint(resourceName, options, new)
|
||||
|
||||
go func() {
|
||||
select {
|
||||
case <-chanForAckOfNotification:
|
||||
close(chanForAckOfNotification)
|
||||
m.runEndpoint(resourceName, new)
|
||||
case <-time.After(time.Second):
|
||||
glog.Errorf("Timed out while waiting for notification ack from plugin")
|
||||
}
|
||||
}()
|
||||
return chanForAckOfNotification, nil
|
||||
}
|
||||
|
||||
func (m *ManagerImpl) registerEndpoint(resourceName string, options *pluginapi.DevicePluginOptions, e *endpointImpl) {
|
||||
m.mutex.Lock()
|
||||
old, ok := m.endpoints[r.ResourceName]
|
||||
defer m.mutex.Unlock()
|
||||
if options != nil {
|
||||
m.pluginOpts[resourceName] = options
|
||||
}
|
||||
old, ok := m.endpoints[resourceName]
|
||||
if ok && old != nil {
|
||||
// Pass devices of previous endpoint into re-registered one,
|
||||
// to avoid potential orphaned devices upon re-registration
|
||||
@ -347,49 +416,40 @@ func (m *ManagerImpl) addEndpoint(r *pluginapi.RegisterRequest) {
|
||||
device.Health = pluginapi.Unhealthy
|
||||
devices[device.ID] = device
|
||||
}
|
||||
existingDevs = devices
|
||||
}
|
||||
m.mutex.Unlock()
|
||||
|
||||
socketPath := filepath.Join(m.socketdir, r.Endpoint)
|
||||
e, err := newEndpointImpl(socketPath, r.ResourceName, existingDevs, m.callback)
|
||||
if err != nil {
|
||||
glog.Errorf("Failed to dial device plugin with request %v: %v", r, err)
|
||||
return
|
||||
}
|
||||
m.mutex.Lock()
|
||||
if r.Options != nil {
|
||||
m.pluginOpts[r.ResourceName] = r.Options
|
||||
}
|
||||
// Check for potential re-registration during the initialization of new endpoint,
|
||||
// and skip updating if re-registration happens.
|
||||
// TODO: simplify the part once we have a better way to handle registered devices
|
||||
ext := m.endpoints[r.ResourceName]
|
||||
if ext != old {
|
||||
glog.Warningf("Some other endpoint %v is added while endpoint %v is initialized", ext, e)
|
||||
m.mutex.Unlock()
|
||||
e.stop()
|
||||
return
|
||||
e.devices = devices
|
||||
}
|
||||
// Associates the newly created endpoint with the corresponding resource name.
|
||||
// Stops existing endpoint if there is any.
|
||||
m.endpoints[r.ResourceName] = e
|
||||
m.endpoints[resourceName] = e
|
||||
glog.V(2).Infof("Registered endpoint %v", e)
|
||||
m.mutex.Unlock()
|
||||
|
||||
if old != nil {
|
||||
old.stop()
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (m *ManagerImpl) runEndpoint(resourceName string, e *endpointImpl) {
|
||||
e.run()
|
||||
e.stop()
|
||||
m.mutex.Lock()
|
||||
defer m.mutex.Unlock()
|
||||
if old, ok := m.endpoints[resourceName]; ok && old == e {
|
||||
m.markResourceUnhealthy(resourceName)
|
||||
}
|
||||
glog.V(2).Infof("Unregistered endpoint %v", e)
|
||||
}
|
||||
|
||||
func (m *ManagerImpl) addEndpoint(r *pluginapi.RegisterRequest) {
|
||||
new, err := newEndpointImpl(filepath.Join(m.socketdir, r.Endpoint), r.ResourceName, make(map[string]pluginapi.Device), m.callback)
|
||||
if err != nil {
|
||||
glog.Errorf("Failed to dial device plugin with request %v: %v", r, err)
|
||||
return
|
||||
}
|
||||
m.registerEndpoint(r.ResourceName, r.Options, new)
|
||||
|
||||
go func() {
|
||||
e.run()
|
||||
e.stop()
|
||||
m.mutex.Lock()
|
||||
if old, ok := m.endpoints[r.ResourceName]; ok && old == e {
|
||||
m.markResourceUnhealthy(r.ResourceName)
|
||||
}
|
||||
glog.V(2).Infof("Unregistered endpoint %v", e)
|
||||
m.mutex.Unlock()
|
||||
m.runEndpoint(r.ResourceName, new)
|
||||
}()
|
||||
}
|
||||
|
||||
|
@ -21,6 +21,7 @@ import (
|
||||
pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1"
|
||||
"k8s.io/kubernetes/pkg/kubelet/config"
|
||||
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
|
||||
"k8s.io/kubernetes/pkg/kubelet/util/pluginwatcher"
|
||||
schedulercache "k8s.io/kubernetes/pkg/scheduler/cache"
|
||||
)
|
||||
|
||||
@ -61,3 +62,10 @@ func (h *ManagerStub) GetDeviceRunContainerOptions(pod *v1.Pod, container *v1.Co
|
||||
func (h *ManagerStub) GetCapacity() (v1.ResourceList, v1.ResourceList, []string) {
|
||||
return nil, nil, []string{}
|
||||
}
|
||||
|
||||
// GetWatcherCallback returns plugin watcher callback
|
||||
func (h *ManagerStub) GetWatcherCallback() pluginwatcher.RegisterCallbackFn {
|
||||
return func(name string, endpoint string, versions []string, sockPath string) (chan bool, error) {
|
||||
return nil, nil
|
||||
}
|
||||
}
|
||||
|
@ -20,6 +20,7 @@ import (
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"reflect"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
@ -33,8 +34,10 @@ import (
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/apimachinery/pkg/util/uuid"
|
||||
pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1"
|
||||
watcherapi "k8s.io/kubernetes/pkg/kubelet/apis/pluginregistration/v1alpha1"
|
||||
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
|
||||
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
|
||||
"k8s.io/kubernetes/pkg/kubelet/util/pluginwatcher"
|
||||
schedulercache "k8s.io/kubernetes/pkg/scheduler/cache"
|
||||
)
|
||||
|
||||
@ -65,31 +68,16 @@ func TestNewManagerImplStart(t *testing.T) {
|
||||
socketDir, socketName, pluginSocketName, err := tmpSocketDir()
|
||||
require.NoError(t, err)
|
||||
defer os.RemoveAll(socketDir)
|
||||
m, p := setup(t, []*pluginapi.Device{}, func(n string, a, u, r []pluginapi.Device) {}, socketName, pluginSocketName)
|
||||
cleanup(t, m, p)
|
||||
// Stop should tolerate being called more than once.
|
||||
cleanup(t, m, p)
|
||||
m, p := setup(t, []*pluginapi.Device{}, func(n string, a, u, r []pluginapi.Device) {}, socketName, pluginSocketName, false)
|
||||
cleanup(t, m, p, nil)
|
||||
}
|
||||
|
||||
func TestNewManagerImplStop(t *testing.T) {
|
||||
func TestNewManagerImplStartProbeMode(t *testing.T) {
|
||||
socketDir, socketName, pluginSocketName, err := tmpSocketDir()
|
||||
require.NoError(t, err)
|
||||
defer os.RemoveAll(socketDir)
|
||||
|
||||
m, err := newManagerImpl(socketName)
|
||||
require.NoError(t, err)
|
||||
// No prior Start, but that should be okay.
|
||||
err = m.Stop()
|
||||
require.NoError(t, err)
|
||||
|
||||
devs := []*pluginapi.Device{
|
||||
{ID: "Dev1", Health: pluginapi.Healthy},
|
||||
{ID: "Dev2", Health: pluginapi.Healthy},
|
||||
}
|
||||
p := NewDevicePluginStub(devs, pluginSocketName)
|
||||
// Same here.
|
||||
err = p.Stop()
|
||||
require.NoError(t, err)
|
||||
m, p, w := setupInProbeMode(t, []*pluginapi.Device{}, func(n string, a, u, r []pluginapi.Device) {}, socketName, pluginSocketName)
|
||||
cleanup(t, m, p, w)
|
||||
}
|
||||
|
||||
// Tests that the device plugin manager correctly handles registration and re-registration by
|
||||
@ -118,9 +106,9 @@ func TestDevicePluginReRegistration(t *testing.T) {
|
||||
}
|
||||
callbackChan <- callbackCount
|
||||
}
|
||||
m, p1 := setup(t, devs, callback, socketName, pluginSocketName)
|
||||
m, p1 := setup(t, devs, callback, socketName, pluginSocketName, preStartContainerFlag)
|
||||
atomic.StoreInt32(&expCallbackCount, 1)
|
||||
p1.Register(socketName, testResourceName, preStartContainerFlag)
|
||||
p1.Register(socketName, testResourceName, "")
|
||||
// Wait for the first callback to be issued.
|
||||
|
||||
select {
|
||||
@ -132,11 +120,11 @@ func TestDevicePluginReRegistration(t *testing.T) {
|
||||
devices := m.Devices()
|
||||
require.Equal(t, 2, len(devices[testResourceName]), "Devices are not updated.")
|
||||
|
||||
p2 := NewDevicePluginStub(devs, pluginSocketName+".new")
|
||||
p2 := NewDevicePluginStub(devs, pluginSocketName+".new", testResourceName, preStartContainerFlag)
|
||||
err = p2.Start()
|
||||
require.NoError(t, err)
|
||||
atomic.StoreInt32(&expCallbackCount, 2)
|
||||
p2.Register(socketName, testResourceName, preStartContainerFlag)
|
||||
p2.Register(socketName, testResourceName, "")
|
||||
// Wait for the second callback to be issued.
|
||||
select {
|
||||
case <-callbackChan:
|
||||
@ -149,11 +137,11 @@ func TestDevicePluginReRegistration(t *testing.T) {
|
||||
require.Equal(t, 2, len(devices2[testResourceName]), "Devices shouldn't change.")
|
||||
|
||||
// Test the scenario that a plugin re-registers with different devices.
|
||||
p3 := NewDevicePluginStub(devsForRegistration, pluginSocketName+".third")
|
||||
p3 := NewDevicePluginStub(devsForRegistration, pluginSocketName+".third", testResourceName, preStartContainerFlag)
|
||||
err = p3.Start()
|
||||
require.NoError(t, err)
|
||||
atomic.StoreInt32(&expCallbackCount, 3)
|
||||
p3.Register(socketName, testResourceName, preStartContainerFlag)
|
||||
p3.Register(socketName, testResourceName, "")
|
||||
// Wait for the second callback to be issued.
|
||||
select {
|
||||
case <-callbackChan:
|
||||
@ -161,16 +149,93 @@ func TestDevicePluginReRegistration(t *testing.T) {
|
||||
case <-time.After(time.Second):
|
||||
t.FailNow()
|
||||
}
|
||||
|
||||
devices3 := m.Devices()
|
||||
require.Equal(t, 1, len(devices3[testResourceName]), "Devices of plugin previously registered should be removed.")
|
||||
p2.Stop()
|
||||
p3.Stop()
|
||||
cleanup(t, m, p1)
|
||||
cleanup(t, m, p1, nil)
|
||||
close(callbackChan)
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
func setup(t *testing.T, devs []*pluginapi.Device, callback monitorCallback, socketName string, pluginSocketName string) (Manager, *Stub) {
|
||||
// Tests that the device plugin manager correctly handles registration and re-registration by
|
||||
// making sure that after registration, devices are correctly updated and if a re-registration
|
||||
// happens, we will NOT delete devices; and no orphaned devices left.
|
||||
// While testing above scenario, plugin discovery and registration will be done using
|
||||
// Kubelet probe based mechanism
|
||||
func TestDevicePluginReRegistrationProbeMode(t *testing.T) {
|
||||
socketDir, socketName, pluginSocketName, err := tmpSocketDir()
|
||||
require.NoError(t, err)
|
||||
defer os.RemoveAll(socketDir)
|
||||
devs := []*pluginapi.Device{
|
||||
{ID: "Dev1", Health: pluginapi.Healthy},
|
||||
{ID: "Dev2", Health: pluginapi.Healthy},
|
||||
}
|
||||
devsForRegistration := []*pluginapi.Device{
|
||||
{ID: "Dev3", Health: pluginapi.Healthy},
|
||||
}
|
||||
|
||||
expCallbackCount := int32(0)
|
||||
callbackCount := int32(0)
|
||||
callbackChan := make(chan int32)
|
||||
callback := func(n string, a, u, r []pluginapi.Device) {
|
||||
callbackCount++
|
||||
if callbackCount > atomic.LoadInt32(&expCallbackCount) {
|
||||
t.FailNow()
|
||||
}
|
||||
callbackChan <- callbackCount
|
||||
}
|
||||
m, p1, w := setupInProbeMode(t, devs, callback, socketName, pluginSocketName)
|
||||
atomic.StoreInt32(&expCallbackCount, 1)
|
||||
// Wait for the first callback to be issued.
|
||||
select {
|
||||
case <-callbackChan:
|
||||
break
|
||||
case <-time.After(time.Second):
|
||||
t.FailNow()
|
||||
}
|
||||
devices := m.Devices()
|
||||
require.Equal(t, 2, len(devices[testResourceName]), "Devices are not updated.")
|
||||
|
||||
p2 := NewDevicePluginStub(devs, pluginSocketName+".new", testResourceName, false)
|
||||
err = p2.Start()
|
||||
require.NoError(t, err)
|
||||
atomic.StoreInt32(&expCallbackCount, 2)
|
||||
// Wait for the second callback to be issued.
|
||||
select {
|
||||
case <-callbackChan:
|
||||
break
|
||||
case <-time.After(time.Second):
|
||||
t.FailNow()
|
||||
}
|
||||
|
||||
devices2 := m.Devices()
|
||||
require.Equal(t, 2, len(devices2[testResourceName]), "Devices shouldn't change.")
|
||||
|
||||
// Test the scenario that a plugin re-registers with different devices.
|
||||
p3 := NewDevicePluginStub(devsForRegistration, pluginSocketName+".third", testResourceName, false)
|
||||
err = p3.Start()
|
||||
require.NoError(t, err)
|
||||
atomic.StoreInt32(&expCallbackCount, 3)
|
||||
// Wait for the second callback to be issued.
|
||||
select {
|
||||
case <-callbackChan:
|
||||
break
|
||||
case <-time.After(time.Second):
|
||||
t.FailNow()
|
||||
}
|
||||
|
||||
devices3 := m.Devices()
|
||||
require.Equal(t, 1, len(devices3[testResourceName]), "Devices of plugin previously registered should be removed.")
|
||||
p2.Stop()
|
||||
p3.Stop()
|
||||
cleanup(t, m, p1, w)
|
||||
close(callbackChan)
|
||||
}
|
||||
|
||||
func setup(t *testing.T, devs []*pluginapi.Device, callback monitorCallback, socketName string, pluginSocketName string, preStartContainerFlag bool) (Manager, *Stub) {
|
||||
m, err := newManagerImpl(socketName)
|
||||
require.NoError(t, err)
|
||||
|
||||
@ -182,16 +247,43 @@ func setup(t *testing.T, devs []*pluginapi.Device, callback monitorCallback, soc
|
||||
err = m.Start(activePods, &sourcesReadyStub{})
|
||||
require.NoError(t, err)
|
||||
|
||||
p := NewDevicePluginStub(devs, pluginSocketName)
|
||||
p := NewDevicePluginStub(devs, pluginSocketName, testResourceName, preStartContainerFlag)
|
||||
err = p.Start()
|
||||
require.NoError(t, err)
|
||||
|
||||
return m, p
|
||||
}
|
||||
|
||||
func cleanup(t *testing.T, m Manager, p *Stub) {
|
||||
func setupInProbeMode(t *testing.T, devs []*pluginapi.Device, callback monitorCallback, socketName string, pluginSocketName string) (Manager, *Stub, *pluginwatcher.Watcher) {
|
||||
w := pluginwatcher.NewWatcher(filepath.Dir(pluginSocketName))
|
||||
|
||||
m, err := newManagerImpl(socketName)
|
||||
require.NoError(t, err)
|
||||
|
||||
w.AddHandler(watcherapi.DevicePlugin, m.GetWatcherCallback())
|
||||
w.Start()
|
||||
|
||||
m.callback = callback
|
||||
|
||||
activePods := func() []*v1.Pod {
|
||||
return []*v1.Pod{}
|
||||
}
|
||||
err = m.Start(activePods, &sourcesReadyStub{})
|
||||
require.NoError(t, err)
|
||||
|
||||
p := NewDevicePluginStub(devs, pluginSocketName, testResourceName, false /*preStart*/)
|
||||
err = p.Start()
|
||||
require.NoError(t, err)
|
||||
|
||||
return m, p, &w
|
||||
}
|
||||
|
||||
func cleanup(t *testing.T, m Manager, p *Stub, w *pluginwatcher.Watcher) {
|
||||
p.Stop()
|
||||
m.Stop()
|
||||
if w != nil {
|
||||
require.NoError(t, w.Stop())
|
||||
}
|
||||
}
|
||||
|
||||
func TestUpdateCapacityAllocatable(t *testing.T) {
|
||||
|
@ -24,6 +24,7 @@ import (
|
||||
"k8s.io/kubernetes/pkg/kubelet/config"
|
||||
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
||||
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
|
||||
watcher "k8s.io/kubernetes/pkg/kubelet/util/pluginwatcher"
|
||||
schedulercache "k8s.io/kubernetes/pkg/scheduler/cache"
|
||||
)
|
||||
|
||||
@ -58,6 +59,7 @@ type Manager interface {
|
||||
// GetCapacity returns the amount of available device plugin resource capacity, resource allocatable
|
||||
// and inactive device plugin resources previously registered on the node.
|
||||
GetCapacity() (v1.ResourceList, v1.ResourceList, []string)
|
||||
GetWatcherCallback() watcher.RegisterCallbackFn
|
||||
}
|
||||
|
||||
// DeviceRunContainerOptions contains the combined container runtime settings to consume its allocated devices.
|
||||
|
@ -58,6 +58,7 @@ import (
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
internalapi "k8s.io/kubernetes/pkg/kubelet/apis/cri"
|
||||
kubeletconfiginternal "k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig"
|
||||
pluginwatcherapi "k8s.io/kubernetes/pkg/kubelet/apis/pluginregistration/v1alpha1"
|
||||
"k8s.io/kubernetes/pkg/kubelet/cadvisor"
|
||||
kubeletcertificate "k8s.io/kubernetes/pkg/kubelet/certificate"
|
||||
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
|
||||
@ -1307,6 +1308,8 @@ func (kl *Kubelet) initializeRuntimeDependentModules() {
|
||||
if kl.enablePluginsWatcher {
|
||||
// Adding Registration Callback function for CSI Driver
|
||||
kl.pluginWatcher.AddHandler("CSIPlugin", csi.RegistrationCallback)
|
||||
// Adding Registration Callback function for Device Manager
|
||||
kl.pluginWatcher.AddHandler(pluginwatcherapi.DevicePlugin, kl.containerManager.GetPluginRegistrationHandlerCallback())
|
||||
// Start the plugin watcher
|
||||
glog.V(4).Infof("starting watcher")
|
||||
if err := kl.pluginWatcher.Start(); err != nil {
|
||||
|
@ -26,9 +26,10 @@ import (
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
"k8s.io/apimachinery/pkg/util/uuid"
|
||||
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/util/uuid"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
"k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig"
|
||||
"k8s.io/kubernetes/test/e2e/framework"
|
||||
|
||||
pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1"
|
||||
@ -40,14 +41,28 @@ import (
|
||||
|
||||
const (
|
||||
// fake resource name
|
||||
resourceName = "fake.com/resource"
|
||||
resourceName = "fake.com/resource"
|
||||
resourceNameWithProbeSupport = "fake.com/resource2"
|
||||
)
|
||||
|
||||
// Serial because the test restarts Kubelet
|
||||
var _ = framework.KubeDescribe("Device Plugin [Feature:DevicePlugin][NodeFeature:DevicePlugin][Serial]", func() {
|
||||
f := framework.NewDefaultFramework("device-plugin-errors")
|
||||
testDevicePlugin(f, false, pluginapi.DevicePluginPath)
|
||||
})
|
||||
|
||||
var _ = framework.KubeDescribe("Device Plugin [Feature:DevicePluginProbe][NodeFeature:DevicePluginProbe][Serial]", func() {
|
||||
f := framework.NewDefaultFramework("device-plugin-errors")
|
||||
testDevicePlugin(f, true, "/var/lib/kubelet/plugins/")
|
||||
})
|
||||
|
||||
func testDevicePlugin(f *framework.Framework, enablePluginWatcher bool, pluginSockDir string) {
|
||||
Context("DevicePlugin", func() {
|
||||
By("Enabling support for Kubelet Plugins Watcher")
|
||||
tempSetCurrentKubeletConfig(f, func(initialConfig *kubeletconfig.KubeletConfiguration) {
|
||||
initialConfig.FeatureGates[string(features.KubeletPluginsWatcher)] = enablePluginWatcher
|
||||
})
|
||||
//devicePluginSockPaths := []string{pluginapi.DevicePluginPath}
|
||||
It("Verifies the Kubelet device plugin functionality.", func() {
|
||||
By("Start stub device plugin")
|
||||
// fake devices for e2e test
|
||||
@ -56,15 +71,16 @@ var _ = framework.KubeDescribe("Device Plugin [Feature:DevicePlugin][NodeFeature
|
||||
{ID: "Dev-2", Health: pluginapi.Healthy},
|
||||
}
|
||||
|
||||
socketPath := pluginapi.DevicePluginPath + "dp." + fmt.Sprintf("%d", time.Now().Unix())
|
||||
socketPath := pluginSockDir + "dp." + fmt.Sprintf("%d", time.Now().Unix())
|
||||
framework.Logf("socketPath %v", socketPath)
|
||||
|
||||
dp1 := dm.NewDevicePluginStub(devs, socketPath)
|
||||
dp1 := dm.NewDevicePluginStub(devs, socketPath, resourceName, false)
|
||||
dp1.SetAllocFunc(stubAllocFunc)
|
||||
err := dp1.Start()
|
||||
framework.ExpectNoError(err)
|
||||
|
||||
By("Register resources")
|
||||
err = dp1.Register(pluginapi.KubeletSocket, resourceName, false)
|
||||
err = dp1.Register(pluginapi.KubeletSocket, resourceName, pluginapi.DevicePluginPath)
|
||||
framework.ExpectNoError(err)
|
||||
|
||||
By("Waiting for the resource exported by the stub device plugin to become available on the local node")
|
||||
@ -103,13 +119,13 @@ var _ = framework.KubeDescribe("Device Plugin [Feature:DevicePlugin][NodeFeature
|
||||
By("Wait for node is ready")
|
||||
framework.WaitForAllNodesSchedulable(f.ClientSet, framework.TestContext.NodeSchedulableTimeout)
|
||||
|
||||
By("Re-Register resources after kubelet restart")
|
||||
dp1 = dm.NewDevicePluginStub(devs, socketPath)
|
||||
By("Re-Register resources")
|
||||
dp1 = dm.NewDevicePluginStub(devs, socketPath, resourceName, false)
|
||||
dp1.SetAllocFunc(stubAllocFunc)
|
||||
err = dp1.Start()
|
||||
framework.ExpectNoError(err)
|
||||
|
||||
err = dp1.Register(pluginapi.KubeletSocket, resourceName, false)
|
||||
err = dp1.Register(pluginapi.KubeletSocket, resourceName, pluginSockDir)
|
||||
framework.ExpectNoError(err)
|
||||
|
||||
By("Waiting for resource to become available on the local node after re-registration")
|
||||
@ -149,12 +165,12 @@ var _ = framework.KubeDescribe("Device Plugin [Feature:DevicePlugin][NodeFeature
|
||||
Expect(devIdRestart2).To(Equal(devId2))
|
||||
|
||||
By("Re-register resources")
|
||||
dp1 = dm.NewDevicePluginStub(devs, socketPath)
|
||||
dp1 = dm.NewDevicePluginStub(devs, socketPath, resourceName, false)
|
||||
dp1.SetAllocFunc(stubAllocFunc)
|
||||
err = dp1.Start()
|
||||
framework.ExpectNoError(err)
|
||||
|
||||
err = dp1.Register(pluginapi.KubeletSocket, resourceName, false)
|
||||
err = dp1.Register(pluginapi.KubeletSocket, resourceName, pluginSockDir)
|
||||
framework.ExpectNoError(err)
|
||||
|
||||
By("Waiting for the resource exported by the stub device plugin to become healthy on the local node")
|
||||
@ -192,7 +208,7 @@ var _ = framework.KubeDescribe("Device Plugin [Feature:DevicePlugin][NodeFeature
|
||||
f.PodClient().DeleteSync(pod2.Name, &metav1.DeleteOptions{}, framework.DefaultPodDeletionTimeout)
|
||||
})
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
// makeBusyboxPod returns a simple Pod spec with a busybox container
|
||||
// that requests resourceName and runs the specified command.
|
||||
|
Loading…
Reference in New Issue
Block a user