mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-22 03:11:40 +00:00
Merge pull request #65948 from figo/stateless
Automatic merge from submit-queue. If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>. Simplify device manager: make endpoint stateless While reviewing devicemanager code, found the caching layer on endpoint is redundant. Here are the 3 related objects in picture: **devicemanager <-> endpoint <-> plugin** plugin is the source of truth for devices and device health status. devicemanager maintain healthyDevices, unhealthyDevices, allocatedDevices based on updates from plugin. So there is no point for endpoint to cache devices, this patch is removing the cache layer, endpoint becomes stateless, which i believe should be the case (but i do welcome review if i missed something here). also removing the Manager.Devices() since i didn't find any caller of this other than test. if we need to get all devices from manager in future, it just need to return healthyDevices + unhealthyDevices, so don't have to call endpoint after all. This patch makes code more readable, data model been simplified. **What this PR does / why we need it**: this patch simplify the device manager code, make it more maintainable. **Which issue(s) this PR fixes** *: this is a refactor of device manager code **Special notes for your reviewer**: will need to rebase the code if #58755 get checked-in first. **Release note**: ```release-note None ``` /sig node /cc @jiayingz @RenaudWasTaken @vishh @saad-ali @vikaschoudhary16 @vladimirvivien @anfernee
This commit is contained in:
commit
742a90cb5a
@ -37,8 +37,7 @@ type endpoint interface {
|
||||
stop()
|
||||
allocate(devs []string) (*pluginapi.AllocateResponse, error)
|
||||
preStartContainer(devs []string) (*pluginapi.PreStartContainerResponse, error)
|
||||
getDevices() []pluginapi.Device
|
||||
callback(resourceName string, added, updated, deleted []pluginapi.Device)
|
||||
callback(resourceName string, devices []pluginapi.Device)
|
||||
isStopped() bool
|
||||
stopGracePeriodExpired() bool
|
||||
}
|
||||
@ -51,15 +50,13 @@ type endpointImpl struct {
|
||||
resourceName string
|
||||
stopTime time.Time
|
||||
|
||||
devices map[string]pluginapi.Device
|
||||
mutex sync.Mutex
|
||||
|
||||
cb monitorCallback
|
||||
mutex sync.Mutex
|
||||
cb monitorCallback
|
||||
}
|
||||
|
||||
// newEndpoint creates a new endpoint for the given resourceName.
|
||||
// This is to be used during normal device plugin registration.
|
||||
func newEndpointImpl(socketPath, resourceName string, devices map[string]pluginapi.Device, callback monitorCallback) (*endpointImpl, error) {
|
||||
func newEndpointImpl(socketPath, resourceName string, callback monitorCallback) (*endpointImpl, error) {
|
||||
client, c, err := dial(socketPath)
|
||||
if err != nil {
|
||||
glog.Errorf("Can't create new endpoint with path %s err %v", socketPath, err)
|
||||
@ -73,41 +70,26 @@ func newEndpointImpl(socketPath, resourceName string, devices map[string]plugina
|
||||
socketPath: socketPath,
|
||||
resourceName: resourceName,
|
||||
|
||||
devices: devices,
|
||||
cb: callback,
|
||||
cb: callback,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// newStoppedEndpointImpl creates a new endpoint for the given resourceName with stopTime set.
|
||||
// This is to be used during Kubelet restart, before the actual device plugin re-registers.
|
||||
func newStoppedEndpointImpl(resourceName string, devices map[string]pluginapi.Device) *endpointImpl {
|
||||
func newStoppedEndpointImpl(resourceName string) *endpointImpl {
|
||||
return &endpointImpl{
|
||||
resourceName: resourceName,
|
||||
devices: devices,
|
||||
stopTime: time.Now(),
|
||||
}
|
||||
}
|
||||
|
||||
func (e *endpointImpl) callback(resourceName string, added, updated, deleted []pluginapi.Device) {
|
||||
e.cb(resourceName, added, updated, deleted)
|
||||
}
|
||||
|
||||
func (e *endpointImpl) getDevices() []pluginapi.Device {
|
||||
e.mutex.Lock()
|
||||
defer e.mutex.Unlock()
|
||||
var devs []pluginapi.Device
|
||||
|
||||
for _, d := range e.devices {
|
||||
devs = append(devs, d)
|
||||
}
|
||||
|
||||
return devs
|
||||
func (e *endpointImpl) callback(resourceName string, devices []pluginapi.Device) {
|
||||
e.cb(resourceName, devices)
|
||||
}
|
||||
|
||||
// run initializes ListAndWatch gRPC call for the device plugin and
|
||||
// blocks on receiving ListAndWatch gRPC stream updates. Each ListAndWatch
|
||||
// stream update contains a new list of device states. listAndWatch compares the new
|
||||
// device states with its cached states to get list of new, updated, and deleted devices.
|
||||
// stream update contains a new list of device states.
|
||||
// It then issues a callback to pass this information to the device manager which
|
||||
// will adjust the resource available information accordingly.
|
||||
func (e *endpointImpl) run() {
|
||||
@ -118,14 +100,6 @@ func (e *endpointImpl) run() {
|
||||
return
|
||||
}
|
||||
|
||||
devices := make(map[string]pluginapi.Device)
|
||||
|
||||
e.mutex.Lock()
|
||||
for _, d := range e.devices {
|
||||
devices[d.ID] = d
|
||||
}
|
||||
e.mutex.Unlock()
|
||||
|
||||
for {
|
||||
response, err := stream.Recv()
|
||||
if err != nil {
|
||||
@ -136,57 +110,12 @@ func (e *endpointImpl) run() {
|
||||
devs := response.Devices
|
||||
glog.V(2).Infof("State pushed for device plugin %s", e.resourceName)
|
||||
|
||||
newDevs := make(map[string]*pluginapi.Device)
|
||||
var added, updated []pluginapi.Device
|
||||
|
||||
var newDevs []pluginapi.Device
|
||||
for _, d := range devs {
|
||||
dOld, ok := devices[d.ID]
|
||||
newDevs[d.ID] = d
|
||||
|
||||
if !ok {
|
||||
glog.V(2).Infof("New device for Endpoint %s: %v", e.resourceName, d)
|
||||
|
||||
devices[d.ID] = *d
|
||||
added = append(added, *d)
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
if d.Health == dOld.Health {
|
||||
continue
|
||||
}
|
||||
|
||||
if d.Health == pluginapi.Unhealthy {
|
||||
glog.Errorf("Device %s is now Unhealthy", d.ID)
|
||||
} else if d.Health == pluginapi.Healthy {
|
||||
glog.V(2).Infof("Device %s is now Healthy", d.ID)
|
||||
}
|
||||
|
||||
devices[d.ID] = *d
|
||||
updated = append(updated, *d)
|
||||
newDevs = append(newDevs, *d)
|
||||
}
|
||||
|
||||
var deleted []pluginapi.Device
|
||||
for id, d := range devices {
|
||||
if _, ok := newDevs[id]; ok {
|
||||
continue
|
||||
}
|
||||
|
||||
glog.Errorf("Device %s was deleted", d.ID)
|
||||
|
||||
deleted = append(deleted, d)
|
||||
delete(devices, id)
|
||||
}
|
||||
|
||||
e.mutex.Lock()
|
||||
// NOTE: Return a copy of 'devices' instead of returning a direct reference to local 'devices'
|
||||
e.devices = make(map[string]pluginapi.Device)
|
||||
for _, d := range devices {
|
||||
e.devices[d.ID] = d
|
||||
}
|
||||
e.mutex.Unlock()
|
||||
|
||||
e.callback(e.resourceName, added, updated, deleted)
|
||||
e.callback(e.resourceName, newDevs)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -37,7 +37,7 @@ func TestNewEndpoint(t *testing.T) {
|
||||
{ID: "ADeviceId", Health: pluginapi.Healthy},
|
||||
}
|
||||
|
||||
p, e := esetup(t, devs, socket, "mock", func(n string, a, u, r []pluginapi.Device) {})
|
||||
p, e := esetup(t, devs, socket, "mock", func(n string, d []pluginapi.Device) {})
|
||||
defer ecleanup(t, p, e)
|
||||
}
|
||||
|
||||
@ -58,7 +58,7 @@ func TestRun(t *testing.T) {
|
||||
|
||||
callbackCount := 0
|
||||
callbackChan := make(chan int)
|
||||
callback := func(n string, a, u, r []pluginapi.Device) {
|
||||
callback := func(n string, devices []pluginapi.Device) {
|
||||
// Should be called twice:
|
||||
// one for plugin registration, one for plugin update.
|
||||
if callbackCount > 2 {
|
||||
@ -67,23 +67,24 @@ func TestRun(t *testing.T) {
|
||||
|
||||
// Check plugin registration
|
||||
if callbackCount == 0 {
|
||||
require.Len(t, a, 3)
|
||||
require.Len(t, u, 0)
|
||||
require.Len(t, r, 0)
|
||||
require.Len(t, devices, 3)
|
||||
require.Equal(t, devices[0].ID, devs[0].ID)
|
||||
require.Equal(t, devices[1].ID, devs[1].ID)
|
||||
require.Equal(t, devices[2].ID, devs[2].ID)
|
||||
require.Equal(t, devices[0].Health, devs[0].Health)
|
||||
require.Equal(t, devices[1].Health, devs[1].Health)
|
||||
require.Equal(t, devices[2].Health, devs[2].Health)
|
||||
}
|
||||
|
||||
// Check plugin update
|
||||
if callbackCount == 1 {
|
||||
require.Len(t, a, 1)
|
||||
require.Len(t, u, 2)
|
||||
require.Len(t, r, 1)
|
||||
|
||||
require.Equal(t, a[0].ID, updated[2].ID)
|
||||
require.Equal(t, u[0].ID, updated[0].ID)
|
||||
require.Equal(t, u[0].Health, updated[0].Health)
|
||||
require.Equal(t, u[1].ID, updated[1].ID)
|
||||
require.Equal(t, u[1].Health, updated[1].Health)
|
||||
require.Equal(t, r[0].ID, devs[1].ID)
|
||||
require.Len(t, devices, 3)
|
||||
require.Equal(t, devices[0].ID, updated[0].ID)
|
||||
require.Equal(t, devices[1].ID, updated[1].ID)
|
||||
require.Equal(t, devices[2].ID, updated[2].ID)
|
||||
require.Equal(t, devices[0].Health, updated[0].Health)
|
||||
require.Equal(t, devices[1].Health, updated[1].Health)
|
||||
require.Equal(t, devices[2].Health, updated[2].Health)
|
||||
}
|
||||
|
||||
callbackCount++
|
||||
@ -102,18 +103,7 @@ func TestRun(t *testing.T) {
|
||||
// Wait for the second callback to be issued.
|
||||
<-callbackChan
|
||||
|
||||
e.mutex.Lock()
|
||||
defer e.mutex.Unlock()
|
||||
|
||||
require.Len(t, e.devices, 3)
|
||||
for _, dref := range updated {
|
||||
d, ok := e.devices[dref.ID]
|
||||
|
||||
require.True(t, ok)
|
||||
require.Equal(t, d.ID, dref.ID)
|
||||
require.Equal(t, d.Health, dref.Health)
|
||||
}
|
||||
|
||||
require.Equal(t, callbackCount, 2)
|
||||
}
|
||||
|
||||
func TestAllocate(t *testing.T) {
|
||||
@ -123,7 +113,7 @@ func TestAllocate(t *testing.T) {
|
||||
}
|
||||
callbackCount := 0
|
||||
callbackChan := make(chan int)
|
||||
p, e := esetup(t, devs, socket, "mock", func(n string, a, u, r []pluginapi.Device) {
|
||||
p, e := esetup(t, devs, socket, "mock", func(n string, d []pluginapi.Device) {
|
||||
callbackCount++
|
||||
callbackChan <- callbackCount
|
||||
})
|
||||
@ -169,23 +159,13 @@ func TestAllocate(t *testing.T) {
|
||||
require.Equal(t, resp, respOut)
|
||||
}
|
||||
|
||||
func TestGetDevices(t *testing.T) {
|
||||
e := endpointImpl{
|
||||
devices: map[string]pluginapi.Device{
|
||||
"ADeviceId": {ID: "ADeviceId", Health: pluginapi.Healthy},
|
||||
},
|
||||
}
|
||||
devs := e.getDevices()
|
||||
require.Len(t, devs, 1)
|
||||
}
|
||||
|
||||
func esetup(t *testing.T, devs []*pluginapi.Device, socket, resourceName string, callback monitorCallback) (*Stub, *endpointImpl) {
|
||||
p := NewDevicePluginStub(devs, socket, resourceName, false)
|
||||
|
||||
err := p.Start()
|
||||
require.NoError(t, err)
|
||||
|
||||
e, err := newEndpointImpl(socket, resourceName, make(map[string]pluginapi.Device), callback)
|
||||
e, err := newEndpointImpl(socket, resourceName, callback)
|
||||
require.NoError(t, err)
|
||||
|
||||
return p, e
|
||||
|
@ -49,7 +49,7 @@ type ActivePodsFunc func() []*v1.Pod
|
||||
// monitorCallback is the function called when a device's health state changes,
|
||||
// or new devices are reported, or old devices are deleted.
|
||||
// Updated contains the most recent state of the Device.
|
||||
type monitorCallback func(resourceName string, added, updated, deleted []pluginapi.Device)
|
||||
type monitorCallback func(resourceName string, devices []pluginapi.Device)
|
||||
|
||||
// ManagerImpl is the structure in charge of managing Device Plugins.
|
||||
type ManagerImpl struct {
|
||||
@ -133,28 +133,17 @@ func newManagerImpl(socketPath string) (*ManagerImpl, error) {
|
||||
return manager, nil
|
||||
}
|
||||
|
||||
func (m *ManagerImpl) genericDeviceUpdateCallback(resourceName string, added, updated, deleted []pluginapi.Device) {
|
||||
kept := append(updated, added...)
|
||||
func (m *ManagerImpl) genericDeviceUpdateCallback(resourceName string, devices []pluginapi.Device) {
|
||||
m.mutex.Lock()
|
||||
if _, ok := m.healthyDevices[resourceName]; !ok {
|
||||
m.healthyDevices[resourceName] = sets.NewString()
|
||||
}
|
||||
if _, ok := m.unhealthyDevices[resourceName]; !ok {
|
||||
m.unhealthyDevices[resourceName] = sets.NewString()
|
||||
}
|
||||
for _, dev := range kept {
|
||||
m.healthyDevices[resourceName] = sets.NewString()
|
||||
m.unhealthyDevices[resourceName] = sets.NewString()
|
||||
for _, dev := range devices {
|
||||
if dev.Health == pluginapi.Healthy {
|
||||
m.healthyDevices[resourceName].Insert(dev.ID)
|
||||
m.unhealthyDevices[resourceName].Delete(dev.ID)
|
||||
} else {
|
||||
m.unhealthyDevices[resourceName].Insert(dev.ID)
|
||||
m.healthyDevices[resourceName].Delete(dev.ID)
|
||||
}
|
||||
}
|
||||
for _, dev := range deleted {
|
||||
m.healthyDevices[resourceName].Delete(dev.ID)
|
||||
m.unhealthyDevices[resourceName].Delete(dev.ID)
|
||||
}
|
||||
m.mutex.Unlock()
|
||||
m.writeCheckpoint()
|
||||
}
|
||||
@ -277,21 +266,6 @@ func (m *ManagerImpl) isVersionCompatibleWithPlugin(versions []string) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
// Devices is the map of devices that are known by the Device
|
||||
// Plugin manager with the kind of the devices as key
|
||||
func (m *ManagerImpl) Devices() map[string][]pluginapi.Device {
|
||||
m.mutex.Lock()
|
||||
defer m.mutex.Unlock()
|
||||
|
||||
devs := make(map[string][]pluginapi.Device)
|
||||
for k, e := range m.endpoints {
|
||||
glog.V(3).Infof("Endpoint: %+v: %p", k, e)
|
||||
devs[k] = e.getDevices()
|
||||
}
|
||||
|
||||
return devs
|
||||
}
|
||||
|
||||
// Allocate is the call that you can use to allocate a set of devices
|
||||
// from the registered device plugins.
|
||||
func (m *ManagerImpl) Allocate(node *schedulercache.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error {
|
||||
@ -376,7 +350,7 @@ func (m *ManagerImpl) Stop() error {
|
||||
func (m *ManagerImpl) addEndpointProbeMode(resourceName string, socketPath string) (chan bool, error) {
|
||||
chanForAckOfNotification := make(chan bool)
|
||||
|
||||
new, err := newEndpointImpl(socketPath, resourceName, make(map[string]pluginapi.Device), m.callback)
|
||||
new, err := newEndpointImpl(socketPath, resourceName, m.callback)
|
||||
if err != nil {
|
||||
glog.Errorf("Failed to dial device plugin with socketPath %s: %v", socketPath, err)
|
||||
return nil, fmt.Errorf("Failed to dial device plugin with socketPath %s: %v", socketPath, err)
|
||||
@ -407,26 +381,8 @@ func (m *ManagerImpl) registerEndpoint(resourceName string, options *pluginapi.D
|
||||
if options != nil {
|
||||
m.pluginOpts[resourceName] = options
|
||||
}
|
||||
old, ok := m.endpoints[resourceName]
|
||||
if ok && old != nil {
|
||||
// Pass devices of previous endpoint into re-registered one,
|
||||
// to avoid potential orphaned devices upon re-registration
|
||||
devices := make(map[string]pluginapi.Device)
|
||||
for _, device := range old.getDevices() {
|
||||
device.Health = pluginapi.Unhealthy
|
||||
devices[device.ID] = device
|
||||
}
|
||||
e.devices = devices
|
||||
}
|
||||
// Associates the newly created endpoint with the corresponding resource name.
|
||||
// Stops existing endpoint if there is any.
|
||||
m.endpoints[resourceName] = e
|
||||
glog.V(2).Infof("Registered endpoint %v", e)
|
||||
|
||||
if old != nil {
|
||||
old.stop()
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (m *ManagerImpl) runEndpoint(resourceName string, e *endpointImpl) {
|
||||
@ -441,13 +397,12 @@ func (m *ManagerImpl) runEndpoint(resourceName string, e *endpointImpl) {
|
||||
}
|
||||
|
||||
func (m *ManagerImpl) addEndpoint(r *pluginapi.RegisterRequest) {
|
||||
new, err := newEndpointImpl(filepath.Join(m.socketdir, r.Endpoint), r.ResourceName, make(map[string]pluginapi.Device), m.callback)
|
||||
new, err := newEndpointImpl(filepath.Join(m.socketdir, r.Endpoint), r.ResourceName, m.callback)
|
||||
if err != nil {
|
||||
glog.Errorf("Failed to dial device plugin with request %v: %v", r, err)
|
||||
return
|
||||
}
|
||||
m.registerEndpoint(r.ResourceName, r.Options, new)
|
||||
|
||||
go func() {
|
||||
m.runEndpoint(r.ResourceName, new)
|
||||
}()
|
||||
@ -567,7 +522,7 @@ func (m *ManagerImpl) readCheckpoint() error {
|
||||
// will stay zero till the corresponding device plugin re-registers.
|
||||
m.healthyDevices[resource] = sets.NewString()
|
||||
m.unhealthyDevices[resource] = sets.NewString()
|
||||
m.endpoints[resource] = newStoppedEndpointImpl(resource, make(map[string]pluginapi.Device))
|
||||
m.endpoints[resource] = newStoppedEndpointImpl(resource)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -18,7 +18,6 @@ package devicemanager
|
||||
|
||||
import (
|
||||
"k8s.io/api/core/v1"
|
||||
pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1"
|
||||
"k8s.io/kubernetes/pkg/kubelet/config"
|
||||
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
|
||||
"k8s.io/kubernetes/pkg/kubelet/util/pluginwatcher"
|
||||
@ -43,11 +42,6 @@ func (h *ManagerStub) Stop() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Devices returns an empty map.
|
||||
func (h *ManagerStub) Devices() map[string][]pluginapi.Device {
|
||||
return make(map[string][]pluginapi.Device)
|
||||
}
|
||||
|
||||
// Allocate simply returns nil.
|
||||
func (h *ManagerStub) Allocate(node *schedulercache.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error {
|
||||
return nil
|
||||
|
@ -22,7 +22,6 @@ import (
|
||||
"os"
|
||||
"path/filepath"
|
||||
"reflect"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@ -68,7 +67,9 @@ func TestNewManagerImplStart(t *testing.T) {
|
||||
socketDir, socketName, pluginSocketName, err := tmpSocketDir()
|
||||
require.NoError(t, err)
|
||||
defer os.RemoveAll(socketDir)
|
||||
m, p := setup(t, []*pluginapi.Device{}, func(n string, a, u, r []pluginapi.Device) {}, socketName, pluginSocketName, false)
|
||||
m, _, p := setup(t, []*pluginapi.Device{}, func(n string, d []pluginapi.Device) {}, socketName, pluginSocketName)
|
||||
cleanup(t, m, p, nil)
|
||||
// Stop should tolerate being called more than once.
|
||||
cleanup(t, m, p, nil)
|
||||
}
|
||||
|
||||
@ -76,7 +77,7 @@ func TestNewManagerImplStartProbeMode(t *testing.T) {
|
||||
socketDir, socketName, pluginSocketName, err := tmpSocketDir()
|
||||
require.NoError(t, err)
|
||||
defer os.RemoveAll(socketDir)
|
||||
m, p, w := setupInProbeMode(t, []*pluginapi.Device{}, func(n string, a, u, r []pluginapi.Device) {}, socketName, pluginSocketName)
|
||||
m, _, p, w := setupInProbeMode(t, []*pluginapi.Device{}, func(n string, d []pluginapi.Device) {}, socketName, pluginSocketName)
|
||||
cleanup(t, m, p, w)
|
||||
}
|
||||
|
||||
@ -95,68 +96,55 @@ func TestDevicePluginReRegistration(t *testing.T) {
|
||||
{ID: "Dev3", Health: pluginapi.Healthy},
|
||||
}
|
||||
for _, preStartContainerFlag := range []bool{false, true} {
|
||||
|
||||
expCallbackCount := int32(0)
|
||||
callbackCount := int32(0)
|
||||
callbackChan := make(chan int32)
|
||||
callback := func(n string, a, u, r []pluginapi.Device) {
|
||||
callbackCount++
|
||||
if callbackCount > atomic.LoadInt32(&expCallbackCount) {
|
||||
t.FailNow()
|
||||
}
|
||||
callbackChan <- callbackCount
|
||||
}
|
||||
m, p1 := setup(t, devs, callback, socketName, pluginSocketName, preStartContainerFlag)
|
||||
atomic.StoreInt32(&expCallbackCount, 1)
|
||||
m, ch, p1 := setup(t, devs, nil, socketName, pluginSocketName)
|
||||
p1.Register(socketName, testResourceName, "")
|
||||
// Wait for the first callback to be issued.
|
||||
|
||||
select {
|
||||
case <-callbackChan:
|
||||
break
|
||||
case <-time.After(time.Second):
|
||||
t.FailNow()
|
||||
case <-ch:
|
||||
case <-time.After(5 * time.Second):
|
||||
t.Fatalf("timeout while waiting for manager update")
|
||||
}
|
||||
devices := m.Devices()
|
||||
require.Equal(t, 2, len(devices[testResourceName]), "Devices are not updated.")
|
||||
capacity, allocatable, _ := m.GetCapacity()
|
||||
resourceCapacity, _ := capacity[v1.ResourceName(testResourceName)]
|
||||
resourceAllocatable, _ := allocatable[v1.ResourceName(testResourceName)]
|
||||
require.Equal(t, resourceCapacity.Value(), resourceAllocatable.Value(), "capacity should equal to allocatable")
|
||||
require.Equal(t, int64(2), resourceAllocatable.Value(), "Devices are not updated.")
|
||||
|
||||
p2 := NewDevicePluginStub(devs, pluginSocketName+".new", testResourceName, preStartContainerFlag)
|
||||
err = p2.Start()
|
||||
require.NoError(t, err)
|
||||
atomic.StoreInt32(&expCallbackCount, 2)
|
||||
p2.Register(socketName, testResourceName, "")
|
||||
// Wait for the second callback to be issued.
|
||||
select {
|
||||
case <-callbackChan:
|
||||
break
|
||||
case <-time.After(time.Second):
|
||||
t.FailNow()
|
||||
}
|
||||
|
||||
devices2 := m.Devices()
|
||||
require.Equal(t, 2, len(devices2[testResourceName]), "Devices shouldn't change.")
|
||||
select {
|
||||
case <-ch:
|
||||
case <-time.After(5 * time.Second):
|
||||
t.Fatalf("timeout while waiting for manager update")
|
||||
}
|
||||
capacity, allocatable, _ = m.GetCapacity()
|
||||
resourceCapacity, _ = capacity[v1.ResourceName(testResourceName)]
|
||||
resourceAllocatable, _ = allocatable[v1.ResourceName(testResourceName)]
|
||||
require.Equal(t, resourceCapacity.Value(), resourceAllocatable.Value(), "capacity should equal to allocatable")
|
||||
require.Equal(t, int64(2), resourceAllocatable.Value(), "Devices shouldn't change.")
|
||||
|
||||
// Test the scenario that a plugin re-registers with different devices.
|
||||
p3 := NewDevicePluginStub(devsForRegistration, pluginSocketName+".third", testResourceName, preStartContainerFlag)
|
||||
err = p3.Start()
|
||||
require.NoError(t, err)
|
||||
atomic.StoreInt32(&expCallbackCount, 3)
|
||||
p3.Register(socketName, testResourceName, "")
|
||||
// Wait for the second callback to be issued.
|
||||
select {
|
||||
case <-callbackChan:
|
||||
break
|
||||
case <-time.After(time.Second):
|
||||
t.FailNow()
|
||||
}
|
||||
|
||||
devices3 := m.Devices()
|
||||
require.Equal(t, 1, len(devices3[testResourceName]), "Devices of plugin previously registered should be removed.")
|
||||
select {
|
||||
case <-ch:
|
||||
case <-time.After(5 * time.Second):
|
||||
t.Fatalf("timeout while waiting for manager update")
|
||||
}
|
||||
capacity, allocatable, _ = m.GetCapacity()
|
||||
resourceCapacity, _ = capacity[v1.ResourceName(testResourceName)]
|
||||
resourceAllocatable, _ = allocatable[v1.ResourceName(testResourceName)]
|
||||
require.Equal(t, resourceCapacity.Value(), resourceAllocatable.Value(), "capacity should equal to allocatable")
|
||||
require.Equal(t, int64(1), resourceAllocatable.Value(), "Devices of plugin previously registered should be removed.")
|
||||
p2.Stop()
|
||||
p3.Stop()
|
||||
cleanup(t, m, p1, nil)
|
||||
close(callbackChan)
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@ -177,105 +165,106 @@ func TestDevicePluginReRegistrationProbeMode(t *testing.T) {
|
||||
{ID: "Dev3", Health: pluginapi.Healthy},
|
||||
}
|
||||
|
||||
expCallbackCount := int32(0)
|
||||
callbackCount := int32(0)
|
||||
callbackChan := make(chan int32)
|
||||
callback := func(n string, a, u, r []pluginapi.Device) {
|
||||
callbackCount++
|
||||
if callbackCount > atomic.LoadInt32(&expCallbackCount) {
|
||||
t.FailNow()
|
||||
}
|
||||
callbackChan <- callbackCount
|
||||
}
|
||||
m, p1, w := setupInProbeMode(t, devs, callback, socketName, pluginSocketName)
|
||||
atomic.StoreInt32(&expCallbackCount, 1)
|
||||
m, ch, p1, w := setupInProbeMode(t, devs, nil, socketName, pluginSocketName)
|
||||
|
||||
// Wait for the first callback to be issued.
|
||||
select {
|
||||
case <-callbackChan:
|
||||
break
|
||||
case <-time.After(time.Second):
|
||||
case <-ch:
|
||||
case <-time.After(5 * time.Second):
|
||||
t.FailNow()
|
||||
}
|
||||
devices := m.Devices()
|
||||
require.Equal(t, 2, len(devices[testResourceName]), "Devices are not updated.")
|
||||
capacity, allocatable, _ := m.GetCapacity()
|
||||
resourceCapacity, _ := capacity[v1.ResourceName(testResourceName)]
|
||||
resourceAllocatable, _ := allocatable[v1.ResourceName(testResourceName)]
|
||||
require.Equal(t, resourceCapacity.Value(), resourceAllocatable.Value(), "capacity should equal to allocatable")
|
||||
require.Equal(t, int64(2), resourceAllocatable.Value(), "Devices are not updated.")
|
||||
|
||||
p2 := NewDevicePluginStub(devs, pluginSocketName+".new", testResourceName, false)
|
||||
err = p2.Start()
|
||||
require.NoError(t, err)
|
||||
atomic.StoreInt32(&expCallbackCount, 2)
|
||||
// Wait for the second callback to be issued.
|
||||
select {
|
||||
case <-callbackChan:
|
||||
break
|
||||
case <-time.After(time.Second):
|
||||
case <-ch:
|
||||
case <-time.After(5 * time.Second):
|
||||
t.FailNow()
|
||||
}
|
||||
|
||||
devices2 := m.Devices()
|
||||
require.Equal(t, 2, len(devices2[testResourceName]), "Devices shouldn't change.")
|
||||
capacity, allocatable, _ = m.GetCapacity()
|
||||
resourceCapacity, _ = capacity[v1.ResourceName(testResourceName)]
|
||||
resourceAllocatable, _ = allocatable[v1.ResourceName(testResourceName)]
|
||||
require.Equal(t, resourceCapacity.Value(), resourceAllocatable.Value(), "capacity should equal to allocatable")
|
||||
require.Equal(t, int64(2), resourceAllocatable.Value(), "Devices are not updated.")
|
||||
|
||||
// Test the scenario that a plugin re-registers with different devices.
|
||||
p3 := NewDevicePluginStub(devsForRegistration, pluginSocketName+".third", testResourceName, false)
|
||||
err = p3.Start()
|
||||
require.NoError(t, err)
|
||||
atomic.StoreInt32(&expCallbackCount, 3)
|
||||
// Wait for the second callback to be issued.
|
||||
// Wait for the third callback to be issued.
|
||||
select {
|
||||
case <-callbackChan:
|
||||
break
|
||||
case <-time.After(time.Second):
|
||||
case <-ch:
|
||||
case <-time.After(5 * time.Second):
|
||||
t.FailNow()
|
||||
}
|
||||
|
||||
devices3 := m.Devices()
|
||||
require.Equal(t, 1, len(devices3[testResourceName]), "Devices of plugin previously registered should be removed.")
|
||||
capacity, allocatable, _ = m.GetCapacity()
|
||||
resourceCapacity, _ = capacity[v1.ResourceName(testResourceName)]
|
||||
resourceAllocatable, _ = allocatable[v1.ResourceName(testResourceName)]
|
||||
require.Equal(t, resourceCapacity.Value(), resourceAllocatable.Value(), "capacity should equal to allocatable")
|
||||
require.Equal(t, int64(1), resourceAllocatable.Value(), "Devices of previous registered should be removed")
|
||||
p2.Stop()
|
||||
p3.Stop()
|
||||
cleanup(t, m, p1, w)
|
||||
close(callbackChan)
|
||||
}
|
||||
|
||||
func setup(t *testing.T, devs []*pluginapi.Device, callback monitorCallback, socketName string, pluginSocketName string, preStartContainerFlag bool) (Manager, *Stub) {
|
||||
func setupDeviceManager(t *testing.T, devs []*pluginapi.Device, callback monitorCallback, socketName string) (Manager, <-chan interface{}) {
|
||||
m, err := newManagerImpl(socketName)
|
||||
require.NoError(t, err)
|
||||
updateChan := make(chan interface{})
|
||||
|
||||
m.callback = callback
|
||||
if callback != nil {
|
||||
m.callback = callback
|
||||
}
|
||||
|
||||
originalCallback := m.callback
|
||||
m.callback = func(resourceName string, devices []pluginapi.Device) {
|
||||
originalCallback(resourceName, devices)
|
||||
updateChan <- new(interface{})
|
||||
}
|
||||
activePods := func() []*v1.Pod {
|
||||
return []*v1.Pod{}
|
||||
}
|
||||
|
||||
err = m.Start(activePods, &sourcesReadyStub{})
|
||||
require.NoError(t, err)
|
||||
|
||||
p := NewDevicePluginStub(devs, pluginSocketName, testResourceName, preStartContainerFlag)
|
||||
err = p.Start()
|
||||
require.NoError(t, err)
|
||||
|
||||
return m, p
|
||||
return m, updateChan
|
||||
}
|
||||
|
||||
func setupInProbeMode(t *testing.T, devs []*pluginapi.Device, callback monitorCallback, socketName string, pluginSocketName string) (Manager, *Stub, *pluginwatcher.Watcher) {
|
||||
w := pluginwatcher.NewWatcher(filepath.Dir(pluginSocketName))
|
||||
|
||||
m, err := newManagerImpl(socketName)
|
||||
func setupDevicePlugin(t *testing.T, devs []*pluginapi.Device, pluginSocketName string) *Stub {
|
||||
p := NewDevicePluginStub(devs, pluginSocketName, testResourceName, false)
|
||||
err := p.Start()
|
||||
require.NoError(t, err)
|
||||
return p
|
||||
}
|
||||
|
||||
func setupPluginWatcher(pluginSocketName string, m Manager) *pluginwatcher.Watcher {
|
||||
w := pluginwatcher.NewWatcher(filepath.Dir(pluginSocketName))
|
||||
w.AddHandler(watcherapi.DevicePlugin, m.GetWatcherCallback())
|
||||
w.Start()
|
||||
return &w
|
||||
}
|
||||
|
||||
m.callback = callback
|
||||
func setup(t *testing.T, devs []*pluginapi.Device, callback monitorCallback, socketName string, pluginSocketName string) (Manager, <-chan interface{}, *Stub) {
|
||||
m, updateChan := setupDeviceManager(t, devs, callback, socketName)
|
||||
p := setupDevicePlugin(t, devs, pluginSocketName)
|
||||
return m, updateChan, p
|
||||
}
|
||||
|
||||
activePods := func() []*v1.Pod {
|
||||
return []*v1.Pod{}
|
||||
}
|
||||
err = m.Start(activePods, &sourcesReadyStub{})
|
||||
require.NoError(t, err)
|
||||
|
||||
p := NewDevicePluginStub(devs, pluginSocketName, testResourceName, false /*preStart*/)
|
||||
err = p.Start()
|
||||
require.NoError(t, err)
|
||||
|
||||
return m, p, &w
|
||||
func setupInProbeMode(t *testing.T, devs []*pluginapi.Device, callback monitorCallback, socketName string, pluginSocketName string) (Manager, <-chan interface{}, *Stub, *pluginwatcher.Watcher) {
|
||||
m, updateChan := setupDeviceManager(t, devs, callback, socketName)
|
||||
w := setupPluginWatcher(pluginSocketName, m)
|
||||
p := setupDevicePlugin(t, devs, pluginSocketName)
|
||||
return m, updateChan, p, w
|
||||
}
|
||||
|
||||
func cleanup(t *testing.T, m Manager, p *Stub, w *pluginwatcher.Watcher) {
|
||||
@ -305,9 +294,9 @@ func TestUpdateCapacityAllocatable(t *testing.T) {
|
||||
// Adds three devices for resource1, two healthy and one unhealthy.
|
||||
// Expects capacity for resource1 to be 2.
|
||||
resourceName1 := "domain1.com/resource1"
|
||||
e1 := &endpointImpl{devices: make(map[string]pluginapi.Device)}
|
||||
e1 := &endpointImpl{}
|
||||
testManager.endpoints[resourceName1] = e1
|
||||
callback(resourceName1, devs, []pluginapi.Device{}, []pluginapi.Device{})
|
||||
callback(resourceName1, devs)
|
||||
capacity, allocatable, removedResources := testManager.GetCapacity()
|
||||
resource1Capacity, ok := capacity[v1.ResourceName(resourceName1)]
|
||||
as.True(ok)
|
||||
@ -318,7 +307,8 @@ func TestUpdateCapacityAllocatable(t *testing.T) {
|
||||
as.Equal(0, len(removedResources))
|
||||
|
||||
// Deletes an unhealthy device should NOT change allocatable but change capacity.
|
||||
callback(resourceName1, []pluginapi.Device{}, []pluginapi.Device{}, []pluginapi.Device{devs[2]})
|
||||
devs1 := devs[:len(devs)-1]
|
||||
callback(resourceName1, devs1)
|
||||
capacity, allocatable, removedResources = testManager.GetCapacity()
|
||||
resource1Capacity, ok = capacity[v1.ResourceName(resourceName1)]
|
||||
as.True(ok)
|
||||
@ -329,34 +319,34 @@ func TestUpdateCapacityAllocatable(t *testing.T) {
|
||||
as.Equal(0, len(removedResources))
|
||||
|
||||
// Updates a healthy device to unhealthy should reduce allocatable by 1.
|
||||
dev2 := devs[1]
|
||||
dev2.Health = pluginapi.Unhealthy
|
||||
callback(resourceName1, []pluginapi.Device{}, []pluginapi.Device{dev2}, []pluginapi.Device{})
|
||||
devs[1].Health = pluginapi.Unhealthy
|
||||
callback(resourceName1, devs)
|
||||
capacity, allocatable, removedResources = testManager.GetCapacity()
|
||||
resource1Capacity, ok = capacity[v1.ResourceName(resourceName1)]
|
||||
as.True(ok)
|
||||
resource1Allocatable, ok = allocatable[v1.ResourceName(resourceName1)]
|
||||
as.True(ok)
|
||||
as.Equal(int64(2), resource1Capacity.Value())
|
||||
as.Equal(int64(3), resource1Capacity.Value())
|
||||
as.Equal(int64(1), resource1Allocatable.Value())
|
||||
as.Equal(0, len(removedResources))
|
||||
|
||||
// Deletes a healthy device should reduce capacity and allocatable by 1.
|
||||
callback(resourceName1, []pluginapi.Device{}, []pluginapi.Device{}, []pluginapi.Device{devs[0]})
|
||||
devs2 := devs[1:]
|
||||
callback(resourceName1, devs2)
|
||||
capacity, allocatable, removedResources = testManager.GetCapacity()
|
||||
resource1Capacity, ok = capacity[v1.ResourceName(resourceName1)]
|
||||
as.True(ok)
|
||||
resource1Allocatable, ok = allocatable[v1.ResourceName(resourceName1)]
|
||||
as.True(ok)
|
||||
as.Equal(int64(0), resource1Allocatable.Value())
|
||||
as.Equal(int64(1), resource1Capacity.Value())
|
||||
as.Equal(int64(2), resource1Capacity.Value())
|
||||
as.Equal(0, len(removedResources))
|
||||
|
||||
// Tests adding another resource.
|
||||
resourceName2 := "resource2"
|
||||
e2 := &endpointImpl{devices: make(map[string]pluginapi.Device)}
|
||||
e2 := &endpointImpl{}
|
||||
testManager.endpoints[resourceName2] = e2
|
||||
callback(resourceName2, devs, []pluginapi.Device{}, []pluginapi.Device{})
|
||||
callback(resourceName2, devs)
|
||||
capacity, allocatable, removedResources = testManager.GetCapacity()
|
||||
as.Equal(2, len(capacity))
|
||||
resource2Capacity, ok := capacity[v1.ResourceName(resourceName2)]
|
||||
@ -364,7 +354,7 @@ func TestUpdateCapacityAllocatable(t *testing.T) {
|
||||
resource2Allocatable, ok := allocatable[v1.ResourceName(resourceName2)]
|
||||
as.True(ok)
|
||||
as.Equal(int64(3), resource2Capacity.Value())
|
||||
as.Equal(int64(2), resource2Allocatable.Value())
|
||||
as.Equal(int64(1), resource2Allocatable.Value())
|
||||
as.Equal(0, len(removedResources))
|
||||
|
||||
// Expires resourceName1 endpoint. Verifies testManager.GetCapacity() reports that resourceName1
|
||||
@ -552,11 +542,7 @@ type MockEndpoint struct {
|
||||
func (m *MockEndpoint) stop() {}
|
||||
func (m *MockEndpoint) run() {}
|
||||
|
||||
func (m *MockEndpoint) getDevices() []pluginapi.Device {
|
||||
return []pluginapi.Device{}
|
||||
}
|
||||
|
||||
func (m *MockEndpoint) callback(resourceName string, added, updated, deleted []pluginapi.Device) {}
|
||||
func (m *MockEndpoint) callback(resourceName string, devices []pluginapi.Device) {}
|
||||
|
||||
func (m *MockEndpoint) preStartContainer(devs []string) (*pluginapi.PreStartContainerResponse, error) {
|
||||
m.initChan <- devs
|
||||
@ -592,7 +578,7 @@ func makePod(limits v1.ResourceList) *v1.Pod {
|
||||
}
|
||||
|
||||
func getTestManager(tmpDir string, activePods ActivePodsFunc, testRes []TestResource, opts map[string]*pluginapi.DevicePluginOptions) (*ManagerImpl, error) {
|
||||
monitorCallback := func(resourceName string, added, updated, deleted []pluginapi.Device) {}
|
||||
monitorCallback := func(resourceName string, devices []pluginapi.Device) {}
|
||||
ckm, err := checkpointmanager.NewCheckpointManager(tmpDir)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -858,7 +844,7 @@ func TestSanitizeNodeAllocatable(t *testing.T) {
|
||||
devID2 := "dev2"
|
||||
|
||||
as := assert.New(t)
|
||||
monitorCallback := func(resourceName string, added, updated, deleted []pluginapi.Device) {}
|
||||
monitorCallback := func(resourceName string, devices []pluginapi.Device) {}
|
||||
tmpDir, err := ioutil.TempDir("", "checkpoint")
|
||||
as.Nil(err)
|
||||
|
||||
|
@ -20,7 +20,6 @@ import (
|
||||
"time"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1"
|
||||
"k8s.io/kubernetes/pkg/kubelet/config"
|
||||
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
||||
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
|
||||
@ -33,11 +32,6 @@ type Manager interface {
|
||||
// Start starts device plugin registration service.
|
||||
Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady) error
|
||||
|
||||
// Devices is the map of devices that have registered themselves
|
||||
// against the manager.
|
||||
// The map key is the ResourceName of the device plugins.
|
||||
Devices() map[string][]pluginapi.Device
|
||||
|
||||
// Allocate configures and assigns devices to pods. The pods are provided
|
||||
// through the pod admission attributes in the attrs argument. From the
|
||||
// requested device resources, Allocate will communicate with the owning
|
||||
|
Loading…
Reference in New Issue
Block a user