While reviewing devicemanager code, found

the caching layer on endpoint is redundant.

Here are the 3 related objects in picture:
devicemanager <-> endpoint <-> plugin

Plugin is the source of truth for devices
and device health status.

devicemanager maintain healthyDevices,
unhealthyDevices, allocatedDevices based on updates
from plugin.

So there is no point for endpoint caching devices,
this patch is removing this caching layer on endpoint,

Also removing the Manager.Devices() since i didn't
find any caller of this other than test, i am adding a
notification channel to facilitate testing,

If we need to get all devices from manager in future,
it just need to return healthyDevices + unhealthyDevices,
we don't have to call endpoint after all.

This patch makes code more readable, data model been simplified.
This commit is contained in:
hui luo 2018-07-29 17:50:18 -07:00
parent 28b6fb5f7d
commit 7101c17498
6 changed files with 143 additions and 305 deletions

View File

@ -37,8 +37,7 @@ type endpoint interface {
stop() stop()
allocate(devs []string) (*pluginapi.AllocateResponse, error) allocate(devs []string) (*pluginapi.AllocateResponse, error)
preStartContainer(devs []string) (*pluginapi.PreStartContainerResponse, error) preStartContainer(devs []string) (*pluginapi.PreStartContainerResponse, error)
getDevices() []pluginapi.Device callback(resourceName string, devices []pluginapi.Device)
callback(resourceName string, added, updated, deleted []pluginapi.Device)
isStopped() bool isStopped() bool
stopGracePeriodExpired() bool stopGracePeriodExpired() bool
} }
@ -51,15 +50,13 @@ type endpointImpl struct {
resourceName string resourceName string
stopTime time.Time stopTime time.Time
devices map[string]pluginapi.Device mutex sync.Mutex
mutex sync.Mutex cb monitorCallback
cb monitorCallback
} }
// newEndpoint creates a new endpoint for the given resourceName. // newEndpoint creates a new endpoint for the given resourceName.
// This is to be used during normal device plugin registration. // This is to be used during normal device plugin registration.
func newEndpointImpl(socketPath, resourceName string, devices map[string]pluginapi.Device, callback monitorCallback) (*endpointImpl, error) { func newEndpointImpl(socketPath, resourceName string, callback monitorCallback) (*endpointImpl, error) {
client, c, err := dial(socketPath) client, c, err := dial(socketPath)
if err != nil { if err != nil {
glog.Errorf("Can't create new endpoint with path %s err %v", socketPath, err) glog.Errorf("Can't create new endpoint with path %s err %v", socketPath, err)
@ -73,41 +70,26 @@ func newEndpointImpl(socketPath, resourceName string, devices map[string]plugina
socketPath: socketPath, socketPath: socketPath,
resourceName: resourceName, resourceName: resourceName,
devices: devices, cb: callback,
cb: callback,
}, nil }, nil
} }
// newStoppedEndpointImpl creates a new endpoint for the given resourceName with stopTime set. // newStoppedEndpointImpl creates a new endpoint for the given resourceName with stopTime set.
// This is to be used during Kubelet restart, before the actual device plugin re-registers. // This is to be used during Kubelet restart, before the actual device plugin re-registers.
func newStoppedEndpointImpl(resourceName string, devices map[string]pluginapi.Device) *endpointImpl { func newStoppedEndpointImpl(resourceName string) *endpointImpl {
return &endpointImpl{ return &endpointImpl{
resourceName: resourceName, resourceName: resourceName,
devices: devices,
stopTime: time.Now(), stopTime: time.Now(),
} }
} }
func (e *endpointImpl) callback(resourceName string, added, updated, deleted []pluginapi.Device) { func (e *endpointImpl) callback(resourceName string, devices []pluginapi.Device) {
e.cb(resourceName, added, updated, deleted) e.cb(resourceName, devices)
}
func (e *endpointImpl) getDevices() []pluginapi.Device {
e.mutex.Lock()
defer e.mutex.Unlock()
var devs []pluginapi.Device
for _, d := range e.devices {
devs = append(devs, d)
}
return devs
} }
// run initializes ListAndWatch gRPC call for the device plugin and // run initializes ListAndWatch gRPC call for the device plugin and
// blocks on receiving ListAndWatch gRPC stream updates. Each ListAndWatch // blocks on receiving ListAndWatch gRPC stream updates. Each ListAndWatch
// stream update contains a new list of device states. listAndWatch compares the new // stream update contains a new list of device states.
// device states with its cached states to get list of new, updated, and deleted devices.
// It then issues a callback to pass this information to the device manager which // It then issues a callback to pass this information to the device manager which
// will adjust the resource available information accordingly. // will adjust the resource available information accordingly.
func (e *endpointImpl) run() { func (e *endpointImpl) run() {
@ -118,14 +100,6 @@ func (e *endpointImpl) run() {
return return
} }
devices := make(map[string]pluginapi.Device)
e.mutex.Lock()
for _, d := range e.devices {
devices[d.ID] = d
}
e.mutex.Unlock()
for { for {
response, err := stream.Recv() response, err := stream.Recv()
if err != nil { if err != nil {
@ -136,57 +110,12 @@ func (e *endpointImpl) run() {
devs := response.Devices devs := response.Devices
glog.V(2).Infof("State pushed for device plugin %s", e.resourceName) glog.V(2).Infof("State pushed for device plugin %s", e.resourceName)
newDevs := make(map[string]*pluginapi.Device) var newDevs []pluginapi.Device
var added, updated []pluginapi.Device
for _, d := range devs { for _, d := range devs {
dOld, ok := devices[d.ID] newDevs = append(newDevs, *d)
newDevs[d.ID] = d
if !ok {
glog.V(2).Infof("New device for Endpoint %s: %v", e.resourceName, d)
devices[d.ID] = *d
added = append(added, *d)
continue
}
if d.Health == dOld.Health {
continue
}
if d.Health == pluginapi.Unhealthy {
glog.Errorf("Device %s is now Unhealthy", d.ID)
} else if d.Health == pluginapi.Healthy {
glog.V(2).Infof("Device %s is now Healthy", d.ID)
}
devices[d.ID] = *d
updated = append(updated, *d)
} }
var deleted []pluginapi.Device e.callback(e.resourceName, newDevs)
for id, d := range devices {
if _, ok := newDevs[id]; ok {
continue
}
glog.Errorf("Device %s was deleted", d.ID)
deleted = append(deleted, d)
delete(devices, id)
}
e.mutex.Lock()
// NOTE: Return a copy of 'devices' instead of returning a direct reference to local 'devices'
e.devices = make(map[string]pluginapi.Device)
for _, d := range devices {
e.devices[d.ID] = d
}
e.mutex.Unlock()
e.callback(e.resourceName, added, updated, deleted)
} }
} }

View File

@ -37,7 +37,7 @@ func TestNewEndpoint(t *testing.T) {
{ID: "ADeviceId", Health: pluginapi.Healthy}, {ID: "ADeviceId", Health: pluginapi.Healthy},
} }
p, e := esetup(t, devs, socket, "mock", func(n string, a, u, r []pluginapi.Device) {}) p, e := esetup(t, devs, socket, "mock", func(n string, d []pluginapi.Device) {})
defer ecleanup(t, p, e) defer ecleanup(t, p, e)
} }
@ -58,7 +58,7 @@ func TestRun(t *testing.T) {
callbackCount := 0 callbackCount := 0
callbackChan := make(chan int) callbackChan := make(chan int)
callback := func(n string, a, u, r []pluginapi.Device) { callback := func(n string, devices []pluginapi.Device) {
// Should be called twice: // Should be called twice:
// one for plugin registration, one for plugin update. // one for plugin registration, one for plugin update.
if callbackCount > 2 { if callbackCount > 2 {
@ -67,23 +67,24 @@ func TestRun(t *testing.T) {
// Check plugin registration // Check plugin registration
if callbackCount == 0 { if callbackCount == 0 {
require.Len(t, a, 3) require.Len(t, devices, 3)
require.Len(t, u, 0) require.Equal(t, devices[0].ID, devs[0].ID)
require.Len(t, r, 0) require.Equal(t, devices[1].ID, devs[1].ID)
require.Equal(t, devices[2].ID, devs[2].ID)
require.Equal(t, devices[0].Health, devs[0].Health)
require.Equal(t, devices[1].Health, devs[1].Health)
require.Equal(t, devices[2].Health, devs[2].Health)
} }
// Check plugin update // Check plugin update
if callbackCount == 1 { if callbackCount == 1 {
require.Len(t, a, 1) require.Len(t, devices, 3)
require.Len(t, u, 2) require.Equal(t, devices[0].ID, updated[0].ID)
require.Len(t, r, 1) require.Equal(t, devices[1].ID, updated[1].ID)
require.Equal(t, devices[2].ID, updated[2].ID)
require.Equal(t, a[0].ID, updated[2].ID) require.Equal(t, devices[0].Health, updated[0].Health)
require.Equal(t, u[0].ID, updated[0].ID) require.Equal(t, devices[1].Health, updated[1].Health)
require.Equal(t, u[0].Health, updated[0].Health) require.Equal(t, devices[2].Health, updated[2].Health)
require.Equal(t, u[1].ID, updated[1].ID)
require.Equal(t, u[1].Health, updated[1].Health)
require.Equal(t, r[0].ID, devs[1].ID)
} }
callbackCount++ callbackCount++
@ -102,18 +103,7 @@ func TestRun(t *testing.T) {
// Wait for the second callback to be issued. // Wait for the second callback to be issued.
<-callbackChan <-callbackChan
e.mutex.Lock() require.Equal(t, callbackCount, 2)
defer e.mutex.Unlock()
require.Len(t, e.devices, 3)
for _, dref := range updated {
d, ok := e.devices[dref.ID]
require.True(t, ok)
require.Equal(t, d.ID, dref.ID)
require.Equal(t, d.Health, dref.Health)
}
} }
func TestAllocate(t *testing.T) { func TestAllocate(t *testing.T) {
@ -123,7 +113,7 @@ func TestAllocate(t *testing.T) {
} }
callbackCount := 0 callbackCount := 0
callbackChan := make(chan int) callbackChan := make(chan int)
p, e := esetup(t, devs, socket, "mock", func(n string, a, u, r []pluginapi.Device) { p, e := esetup(t, devs, socket, "mock", func(n string, d []pluginapi.Device) {
callbackCount++ callbackCount++
callbackChan <- callbackCount callbackChan <- callbackCount
}) })
@ -169,23 +159,13 @@ func TestAllocate(t *testing.T) {
require.Equal(t, resp, respOut) require.Equal(t, resp, respOut)
} }
func TestGetDevices(t *testing.T) {
e := endpointImpl{
devices: map[string]pluginapi.Device{
"ADeviceId": {ID: "ADeviceId", Health: pluginapi.Healthy},
},
}
devs := e.getDevices()
require.Len(t, devs, 1)
}
func esetup(t *testing.T, devs []*pluginapi.Device, socket, resourceName string, callback monitorCallback) (*Stub, *endpointImpl) { func esetup(t *testing.T, devs []*pluginapi.Device, socket, resourceName string, callback monitorCallback) (*Stub, *endpointImpl) {
p := NewDevicePluginStub(devs, socket, resourceName, false) p := NewDevicePluginStub(devs, socket, resourceName, false)
err := p.Start() err := p.Start()
require.NoError(t, err) require.NoError(t, err)
e, err := newEndpointImpl(socket, resourceName, make(map[string]pluginapi.Device), callback) e, err := newEndpointImpl(socket, resourceName, callback)
require.NoError(t, err) require.NoError(t, err)
return p, e return p, e

View File

@ -49,7 +49,7 @@ type ActivePodsFunc func() []*v1.Pod
// monitorCallback is the function called when a device's health state changes, // monitorCallback is the function called when a device's health state changes,
// or new devices are reported, or old devices are deleted. // or new devices are reported, or old devices are deleted.
// Updated contains the most recent state of the Device. // Updated contains the most recent state of the Device.
type monitorCallback func(resourceName string, added, updated, deleted []pluginapi.Device) type monitorCallback func(resourceName string, devices []pluginapi.Device)
// ManagerImpl is the structure in charge of managing Device Plugins. // ManagerImpl is the structure in charge of managing Device Plugins.
type ManagerImpl struct { type ManagerImpl struct {
@ -133,28 +133,17 @@ func newManagerImpl(socketPath string) (*ManagerImpl, error) {
return manager, nil return manager, nil
} }
func (m *ManagerImpl) genericDeviceUpdateCallback(resourceName string, added, updated, deleted []pluginapi.Device) { func (m *ManagerImpl) genericDeviceUpdateCallback(resourceName string, devices []pluginapi.Device) {
kept := append(updated, added...)
m.mutex.Lock() m.mutex.Lock()
if _, ok := m.healthyDevices[resourceName]; !ok { m.healthyDevices[resourceName] = sets.NewString()
m.healthyDevices[resourceName] = sets.NewString() m.unhealthyDevices[resourceName] = sets.NewString()
} for _, dev := range devices {
if _, ok := m.unhealthyDevices[resourceName]; !ok {
m.unhealthyDevices[resourceName] = sets.NewString()
}
for _, dev := range kept {
if dev.Health == pluginapi.Healthy { if dev.Health == pluginapi.Healthy {
m.healthyDevices[resourceName].Insert(dev.ID) m.healthyDevices[resourceName].Insert(dev.ID)
m.unhealthyDevices[resourceName].Delete(dev.ID)
} else { } else {
m.unhealthyDevices[resourceName].Insert(dev.ID) m.unhealthyDevices[resourceName].Insert(dev.ID)
m.healthyDevices[resourceName].Delete(dev.ID)
} }
} }
for _, dev := range deleted {
m.healthyDevices[resourceName].Delete(dev.ID)
m.unhealthyDevices[resourceName].Delete(dev.ID)
}
m.mutex.Unlock() m.mutex.Unlock()
m.writeCheckpoint() m.writeCheckpoint()
} }
@ -277,21 +266,6 @@ func (m *ManagerImpl) isVersionCompatibleWithPlugin(versions []string) bool {
return false return false
} }
// Devices is the map of devices that are known by the Device
// Plugin manager with the kind of the devices as key
func (m *ManagerImpl) Devices() map[string][]pluginapi.Device {
m.mutex.Lock()
defer m.mutex.Unlock()
devs := make(map[string][]pluginapi.Device)
for k, e := range m.endpoints {
glog.V(3).Infof("Endpoint: %+v: %p", k, e)
devs[k] = e.getDevices()
}
return devs
}
// Allocate is the call that you can use to allocate a set of devices // Allocate is the call that you can use to allocate a set of devices
// from the registered device plugins. // from the registered device plugins.
func (m *ManagerImpl) Allocate(node *schedulercache.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error { func (m *ManagerImpl) Allocate(node *schedulercache.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error {
@ -376,7 +350,7 @@ func (m *ManagerImpl) Stop() error {
func (m *ManagerImpl) addEndpointProbeMode(resourceName string, socketPath string) (chan bool, error) { func (m *ManagerImpl) addEndpointProbeMode(resourceName string, socketPath string) (chan bool, error) {
chanForAckOfNotification := make(chan bool) chanForAckOfNotification := make(chan bool)
new, err := newEndpointImpl(socketPath, resourceName, make(map[string]pluginapi.Device), m.callback) new, err := newEndpointImpl(socketPath, resourceName, m.callback)
if err != nil { if err != nil {
glog.Errorf("Failed to dial device plugin with socketPath %s: %v", socketPath, err) glog.Errorf("Failed to dial device plugin with socketPath %s: %v", socketPath, err)
return nil, fmt.Errorf("Failed to dial device plugin with socketPath %s: %v", socketPath, err) return nil, fmt.Errorf("Failed to dial device plugin with socketPath %s: %v", socketPath, err)
@ -407,26 +381,8 @@ func (m *ManagerImpl) registerEndpoint(resourceName string, options *pluginapi.D
if options != nil { if options != nil {
m.pluginOpts[resourceName] = options m.pluginOpts[resourceName] = options
} }
old, ok := m.endpoints[resourceName]
if ok && old != nil {
// Pass devices of previous endpoint into re-registered one,
// to avoid potential orphaned devices upon re-registration
devices := make(map[string]pluginapi.Device)
for _, device := range old.getDevices() {
device.Health = pluginapi.Unhealthy
devices[device.ID] = device
}
e.devices = devices
}
// Associates the newly created endpoint with the corresponding resource name.
// Stops existing endpoint if there is any.
m.endpoints[resourceName] = e m.endpoints[resourceName] = e
glog.V(2).Infof("Registered endpoint %v", e) glog.V(2).Infof("Registered endpoint %v", e)
if old != nil {
old.stop()
}
return
} }
func (m *ManagerImpl) runEndpoint(resourceName string, e *endpointImpl) { func (m *ManagerImpl) runEndpoint(resourceName string, e *endpointImpl) {
@ -441,13 +397,12 @@ func (m *ManagerImpl) runEndpoint(resourceName string, e *endpointImpl) {
} }
func (m *ManagerImpl) addEndpoint(r *pluginapi.RegisterRequest) { func (m *ManagerImpl) addEndpoint(r *pluginapi.RegisterRequest) {
new, err := newEndpointImpl(filepath.Join(m.socketdir, r.Endpoint), r.ResourceName, make(map[string]pluginapi.Device), m.callback) new, err := newEndpointImpl(filepath.Join(m.socketdir, r.Endpoint), r.ResourceName, m.callback)
if err != nil { if err != nil {
glog.Errorf("Failed to dial device plugin with request %v: %v", r, err) glog.Errorf("Failed to dial device plugin with request %v: %v", r, err)
return return
} }
m.registerEndpoint(r.ResourceName, r.Options, new) m.registerEndpoint(r.ResourceName, r.Options, new)
go func() { go func() {
m.runEndpoint(r.ResourceName, new) m.runEndpoint(r.ResourceName, new)
}() }()
@ -567,7 +522,7 @@ func (m *ManagerImpl) readCheckpoint() error {
// will stay zero till the corresponding device plugin re-registers. // will stay zero till the corresponding device plugin re-registers.
m.healthyDevices[resource] = sets.NewString() m.healthyDevices[resource] = sets.NewString()
m.unhealthyDevices[resource] = sets.NewString() m.unhealthyDevices[resource] = sets.NewString()
m.endpoints[resource] = newStoppedEndpointImpl(resource, make(map[string]pluginapi.Device)) m.endpoints[resource] = newStoppedEndpointImpl(resource)
} }
return nil return nil
} }

View File

@ -18,7 +18,6 @@ package devicemanager
import ( import (
"k8s.io/api/core/v1" "k8s.io/api/core/v1"
pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1"
"k8s.io/kubernetes/pkg/kubelet/config" "k8s.io/kubernetes/pkg/kubelet/config"
"k8s.io/kubernetes/pkg/kubelet/lifecycle" "k8s.io/kubernetes/pkg/kubelet/lifecycle"
"k8s.io/kubernetes/pkg/kubelet/util/pluginwatcher" "k8s.io/kubernetes/pkg/kubelet/util/pluginwatcher"
@ -43,11 +42,6 @@ func (h *ManagerStub) Stop() error {
return nil return nil
} }
// Devices returns an empty map.
func (h *ManagerStub) Devices() map[string][]pluginapi.Device {
return make(map[string][]pluginapi.Device)
}
// Allocate simply returns nil. // Allocate simply returns nil.
func (h *ManagerStub) Allocate(node *schedulercache.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error { func (h *ManagerStub) Allocate(node *schedulercache.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error {
return nil return nil

View File

@ -22,7 +22,6 @@ import (
"os" "os"
"path/filepath" "path/filepath"
"reflect" "reflect"
"sync/atomic"
"testing" "testing"
"time" "time"
@ -68,7 +67,9 @@ func TestNewManagerImplStart(t *testing.T) {
socketDir, socketName, pluginSocketName, err := tmpSocketDir() socketDir, socketName, pluginSocketName, err := tmpSocketDir()
require.NoError(t, err) require.NoError(t, err)
defer os.RemoveAll(socketDir) defer os.RemoveAll(socketDir)
m, p := setup(t, []*pluginapi.Device{}, func(n string, a, u, r []pluginapi.Device) {}, socketName, pluginSocketName, false) m, _, p := setup(t, []*pluginapi.Device{}, func(n string, d []pluginapi.Device) {}, socketName, pluginSocketName)
cleanup(t, m, p, nil)
// Stop should tolerate being called more than once.
cleanup(t, m, p, nil) cleanup(t, m, p, nil)
} }
@ -76,7 +77,7 @@ func TestNewManagerImplStartProbeMode(t *testing.T) {
socketDir, socketName, pluginSocketName, err := tmpSocketDir() socketDir, socketName, pluginSocketName, err := tmpSocketDir()
require.NoError(t, err) require.NoError(t, err)
defer os.RemoveAll(socketDir) defer os.RemoveAll(socketDir)
m, p, w := setupInProbeMode(t, []*pluginapi.Device{}, func(n string, a, u, r []pluginapi.Device) {}, socketName, pluginSocketName) m, _, p, w := setupInProbeMode(t, []*pluginapi.Device{}, func(n string, d []pluginapi.Device) {}, socketName, pluginSocketName)
cleanup(t, m, p, w) cleanup(t, m, p, w)
} }
@ -95,68 +96,55 @@ func TestDevicePluginReRegistration(t *testing.T) {
{ID: "Dev3", Health: pluginapi.Healthy}, {ID: "Dev3", Health: pluginapi.Healthy},
} }
for _, preStartContainerFlag := range []bool{false, true} { for _, preStartContainerFlag := range []bool{false, true} {
m, ch, p1 := setup(t, devs, nil, socketName, pluginSocketName)
expCallbackCount := int32(0)
callbackCount := int32(0)
callbackChan := make(chan int32)
callback := func(n string, a, u, r []pluginapi.Device) {
callbackCount++
if callbackCount > atomic.LoadInt32(&expCallbackCount) {
t.FailNow()
}
callbackChan <- callbackCount
}
m, p1 := setup(t, devs, callback, socketName, pluginSocketName, preStartContainerFlag)
atomic.StoreInt32(&expCallbackCount, 1)
p1.Register(socketName, testResourceName, "") p1.Register(socketName, testResourceName, "")
// Wait for the first callback to be issued.
select { select {
case <-callbackChan: case <-ch:
break case <-time.After(5 * time.Second):
case <-time.After(time.Second): t.Fatalf("timeout while waiting for manager update")
t.FailNow()
} }
devices := m.Devices() capacity, allocatable, _ := m.GetCapacity()
require.Equal(t, 2, len(devices[testResourceName]), "Devices are not updated.") resourceCapacity, _ := capacity[v1.ResourceName(testResourceName)]
resourceAllocatable, _ := allocatable[v1.ResourceName(testResourceName)]
require.Equal(t, resourceCapacity.Value(), resourceAllocatable.Value(), "capacity should equal to allocatable")
require.Equal(t, int64(2), resourceAllocatable.Value(), "Devices are not updated.")
p2 := NewDevicePluginStub(devs, pluginSocketName+".new", testResourceName, preStartContainerFlag) p2 := NewDevicePluginStub(devs, pluginSocketName+".new", testResourceName, preStartContainerFlag)
err = p2.Start() err = p2.Start()
require.NoError(t, err) require.NoError(t, err)
atomic.StoreInt32(&expCallbackCount, 2)
p2.Register(socketName, testResourceName, "") p2.Register(socketName, testResourceName, "")
// Wait for the second callback to be issued.
select {
case <-callbackChan:
break
case <-time.After(time.Second):
t.FailNow()
}
devices2 := m.Devices() select {
require.Equal(t, 2, len(devices2[testResourceName]), "Devices shouldn't change.") case <-ch:
case <-time.After(5 * time.Second):
t.Fatalf("timeout while waiting for manager update")
}
capacity, allocatable, _ = m.GetCapacity()
resourceCapacity, _ = capacity[v1.ResourceName(testResourceName)]
resourceAllocatable, _ = allocatable[v1.ResourceName(testResourceName)]
require.Equal(t, resourceCapacity.Value(), resourceAllocatable.Value(), "capacity should equal to allocatable")
require.Equal(t, int64(2), resourceAllocatable.Value(), "Devices shouldn't change.")
// Test the scenario that a plugin re-registers with different devices. // Test the scenario that a plugin re-registers with different devices.
p3 := NewDevicePluginStub(devsForRegistration, pluginSocketName+".third", testResourceName, preStartContainerFlag) p3 := NewDevicePluginStub(devsForRegistration, pluginSocketName+".third", testResourceName, preStartContainerFlag)
err = p3.Start() err = p3.Start()
require.NoError(t, err) require.NoError(t, err)
atomic.StoreInt32(&expCallbackCount, 3)
p3.Register(socketName, testResourceName, "") p3.Register(socketName, testResourceName, "")
// Wait for the second callback to be issued.
select {
case <-callbackChan:
break
case <-time.After(time.Second):
t.FailNow()
}
devices3 := m.Devices() select {
require.Equal(t, 1, len(devices3[testResourceName]), "Devices of plugin previously registered should be removed.") case <-ch:
case <-time.After(5 * time.Second):
t.Fatalf("timeout while waiting for manager update")
}
capacity, allocatable, _ = m.GetCapacity()
resourceCapacity, _ = capacity[v1.ResourceName(testResourceName)]
resourceAllocatable, _ = allocatable[v1.ResourceName(testResourceName)]
require.Equal(t, resourceCapacity.Value(), resourceAllocatable.Value(), "capacity should equal to allocatable")
require.Equal(t, int64(1), resourceAllocatable.Value(), "Devices of plugin previously registered should be removed.")
p2.Stop() p2.Stop()
p3.Stop() p3.Stop()
cleanup(t, m, p1, nil) cleanup(t, m, p1, nil)
close(callbackChan)
} }
} }
@ -177,105 +165,106 @@ func TestDevicePluginReRegistrationProbeMode(t *testing.T) {
{ID: "Dev3", Health: pluginapi.Healthy}, {ID: "Dev3", Health: pluginapi.Healthy},
} }
expCallbackCount := int32(0) m, ch, p1, w := setupInProbeMode(t, devs, nil, socketName, pluginSocketName)
callbackCount := int32(0)
callbackChan := make(chan int32)
callback := func(n string, a, u, r []pluginapi.Device) {
callbackCount++
if callbackCount > atomic.LoadInt32(&expCallbackCount) {
t.FailNow()
}
callbackChan <- callbackCount
}
m, p1, w := setupInProbeMode(t, devs, callback, socketName, pluginSocketName)
atomic.StoreInt32(&expCallbackCount, 1)
// Wait for the first callback to be issued. // Wait for the first callback to be issued.
select { select {
case <-callbackChan: case <-ch:
break case <-time.After(5 * time.Second):
case <-time.After(time.Second):
t.FailNow() t.FailNow()
} }
devices := m.Devices() capacity, allocatable, _ := m.GetCapacity()
require.Equal(t, 2, len(devices[testResourceName]), "Devices are not updated.") resourceCapacity, _ := capacity[v1.ResourceName(testResourceName)]
resourceAllocatable, _ := allocatable[v1.ResourceName(testResourceName)]
require.Equal(t, resourceCapacity.Value(), resourceAllocatable.Value(), "capacity should equal to allocatable")
require.Equal(t, int64(2), resourceAllocatable.Value(), "Devices are not updated.")
p2 := NewDevicePluginStub(devs, pluginSocketName+".new", testResourceName, false) p2 := NewDevicePluginStub(devs, pluginSocketName+".new", testResourceName, false)
err = p2.Start() err = p2.Start()
require.NoError(t, err) require.NoError(t, err)
atomic.StoreInt32(&expCallbackCount, 2)
// Wait for the second callback to be issued. // Wait for the second callback to be issued.
select { select {
case <-callbackChan: case <-ch:
break case <-time.After(5 * time.Second):
case <-time.After(time.Second):
t.FailNow() t.FailNow()
} }
devices2 := m.Devices() capacity, allocatable, _ = m.GetCapacity()
require.Equal(t, 2, len(devices2[testResourceName]), "Devices shouldn't change.") resourceCapacity, _ = capacity[v1.ResourceName(testResourceName)]
resourceAllocatable, _ = allocatable[v1.ResourceName(testResourceName)]
require.Equal(t, resourceCapacity.Value(), resourceAllocatable.Value(), "capacity should equal to allocatable")
require.Equal(t, int64(2), resourceAllocatable.Value(), "Devices are not updated.")
// Test the scenario that a plugin re-registers with different devices. // Test the scenario that a plugin re-registers with different devices.
p3 := NewDevicePluginStub(devsForRegistration, pluginSocketName+".third", testResourceName, false) p3 := NewDevicePluginStub(devsForRegistration, pluginSocketName+".third", testResourceName, false)
err = p3.Start() err = p3.Start()
require.NoError(t, err) require.NoError(t, err)
atomic.StoreInt32(&expCallbackCount, 3) // Wait for the third callback to be issued.
// Wait for the second callback to be issued.
select { select {
case <-callbackChan: case <-ch:
break case <-time.After(5 * time.Second):
case <-time.After(time.Second):
t.FailNow() t.FailNow()
} }
devices3 := m.Devices() capacity, allocatable, _ = m.GetCapacity()
require.Equal(t, 1, len(devices3[testResourceName]), "Devices of plugin previously registered should be removed.") resourceCapacity, _ = capacity[v1.ResourceName(testResourceName)]
resourceAllocatable, _ = allocatable[v1.ResourceName(testResourceName)]
require.Equal(t, resourceCapacity.Value(), resourceAllocatable.Value(), "capacity should equal to allocatable")
require.Equal(t, int64(1), resourceAllocatable.Value(), "Devices of previous registered should be removed")
p2.Stop() p2.Stop()
p3.Stop() p3.Stop()
cleanup(t, m, p1, w) cleanup(t, m, p1, w)
close(callbackChan)
} }
func setup(t *testing.T, devs []*pluginapi.Device, callback monitorCallback, socketName string, pluginSocketName string, preStartContainerFlag bool) (Manager, *Stub) { func setupDeviceManager(t *testing.T, devs []*pluginapi.Device, callback monitorCallback, socketName string) (Manager, <-chan interface{}) {
m, err := newManagerImpl(socketName) m, err := newManagerImpl(socketName)
require.NoError(t, err) require.NoError(t, err)
updateChan := make(chan interface{})
m.callback = callback if callback != nil {
m.callback = callback
}
originalCallback := m.callback
m.callback = func(resourceName string, devices []pluginapi.Device) {
originalCallback(resourceName, devices)
updateChan <- new(interface{})
}
activePods := func() []*v1.Pod { activePods := func() []*v1.Pod {
return []*v1.Pod{} return []*v1.Pod{}
} }
err = m.Start(activePods, &sourcesReadyStub{}) err = m.Start(activePods, &sourcesReadyStub{})
require.NoError(t, err) require.NoError(t, err)
p := NewDevicePluginStub(devs, pluginSocketName, testResourceName, preStartContainerFlag) return m, updateChan
err = p.Start()
require.NoError(t, err)
return m, p
} }
func setupInProbeMode(t *testing.T, devs []*pluginapi.Device, callback monitorCallback, socketName string, pluginSocketName string) (Manager, *Stub, *pluginwatcher.Watcher) { func setupDevicePlugin(t *testing.T, devs []*pluginapi.Device, pluginSocketName string) *Stub {
w := pluginwatcher.NewWatcher(filepath.Dir(pluginSocketName)) p := NewDevicePluginStub(devs, pluginSocketName, testResourceName, false)
err := p.Start()
m, err := newManagerImpl(socketName)
require.NoError(t, err) require.NoError(t, err)
return p
}
func setupPluginWatcher(pluginSocketName string, m Manager) *pluginwatcher.Watcher {
w := pluginwatcher.NewWatcher(filepath.Dir(pluginSocketName))
w.AddHandler(watcherapi.DevicePlugin, m.GetWatcherCallback()) w.AddHandler(watcherapi.DevicePlugin, m.GetWatcherCallback())
w.Start() w.Start()
return &w
}
m.callback = callback func setup(t *testing.T, devs []*pluginapi.Device, callback monitorCallback, socketName string, pluginSocketName string) (Manager, <-chan interface{}, *Stub) {
m, updateChan := setupDeviceManager(t, devs, callback, socketName)
p := setupDevicePlugin(t, devs, pluginSocketName)
return m, updateChan, p
}
activePods := func() []*v1.Pod { func setupInProbeMode(t *testing.T, devs []*pluginapi.Device, callback monitorCallback, socketName string, pluginSocketName string) (Manager, <-chan interface{}, *Stub, *pluginwatcher.Watcher) {
return []*v1.Pod{} m, updateChan := setupDeviceManager(t, devs, callback, socketName)
} w := setupPluginWatcher(pluginSocketName, m)
err = m.Start(activePods, &sourcesReadyStub{}) p := setupDevicePlugin(t, devs, pluginSocketName)
require.NoError(t, err) return m, updateChan, p, w
p := NewDevicePluginStub(devs, pluginSocketName, testResourceName, false /*preStart*/)
err = p.Start()
require.NoError(t, err)
return m, p, &w
} }
func cleanup(t *testing.T, m Manager, p *Stub, w *pluginwatcher.Watcher) { func cleanup(t *testing.T, m Manager, p *Stub, w *pluginwatcher.Watcher) {
@ -305,9 +294,9 @@ func TestUpdateCapacityAllocatable(t *testing.T) {
// Adds three devices for resource1, two healthy and one unhealthy. // Adds three devices for resource1, two healthy and one unhealthy.
// Expects capacity for resource1 to be 2. // Expects capacity for resource1 to be 2.
resourceName1 := "domain1.com/resource1" resourceName1 := "domain1.com/resource1"
e1 := &endpointImpl{devices: make(map[string]pluginapi.Device)} e1 := &endpointImpl{}
testManager.endpoints[resourceName1] = e1 testManager.endpoints[resourceName1] = e1
callback(resourceName1, devs, []pluginapi.Device{}, []pluginapi.Device{}) callback(resourceName1, devs)
capacity, allocatable, removedResources := testManager.GetCapacity() capacity, allocatable, removedResources := testManager.GetCapacity()
resource1Capacity, ok := capacity[v1.ResourceName(resourceName1)] resource1Capacity, ok := capacity[v1.ResourceName(resourceName1)]
as.True(ok) as.True(ok)
@ -318,7 +307,8 @@ func TestUpdateCapacityAllocatable(t *testing.T) {
as.Equal(0, len(removedResources)) as.Equal(0, len(removedResources))
// Deletes an unhealthy device should NOT change allocatable but change capacity. // Deletes an unhealthy device should NOT change allocatable but change capacity.
callback(resourceName1, []pluginapi.Device{}, []pluginapi.Device{}, []pluginapi.Device{devs[2]}) devs1 := devs[:len(devs)-1]
callback(resourceName1, devs1)
capacity, allocatable, removedResources = testManager.GetCapacity() capacity, allocatable, removedResources = testManager.GetCapacity()
resource1Capacity, ok = capacity[v1.ResourceName(resourceName1)] resource1Capacity, ok = capacity[v1.ResourceName(resourceName1)]
as.True(ok) as.True(ok)
@ -329,34 +319,34 @@ func TestUpdateCapacityAllocatable(t *testing.T) {
as.Equal(0, len(removedResources)) as.Equal(0, len(removedResources))
// Updates a healthy device to unhealthy should reduce allocatable by 1. // Updates a healthy device to unhealthy should reduce allocatable by 1.
dev2 := devs[1] devs[1].Health = pluginapi.Unhealthy
dev2.Health = pluginapi.Unhealthy callback(resourceName1, devs)
callback(resourceName1, []pluginapi.Device{}, []pluginapi.Device{dev2}, []pluginapi.Device{})
capacity, allocatable, removedResources = testManager.GetCapacity() capacity, allocatable, removedResources = testManager.GetCapacity()
resource1Capacity, ok = capacity[v1.ResourceName(resourceName1)] resource1Capacity, ok = capacity[v1.ResourceName(resourceName1)]
as.True(ok) as.True(ok)
resource1Allocatable, ok = allocatable[v1.ResourceName(resourceName1)] resource1Allocatable, ok = allocatable[v1.ResourceName(resourceName1)]
as.True(ok) as.True(ok)
as.Equal(int64(2), resource1Capacity.Value()) as.Equal(int64(3), resource1Capacity.Value())
as.Equal(int64(1), resource1Allocatable.Value()) as.Equal(int64(1), resource1Allocatable.Value())
as.Equal(0, len(removedResources)) as.Equal(0, len(removedResources))
// Deletes a healthy device should reduce capacity and allocatable by 1. // Deletes a healthy device should reduce capacity and allocatable by 1.
callback(resourceName1, []pluginapi.Device{}, []pluginapi.Device{}, []pluginapi.Device{devs[0]}) devs2 := devs[1:]
callback(resourceName1, devs2)
capacity, allocatable, removedResources = testManager.GetCapacity() capacity, allocatable, removedResources = testManager.GetCapacity()
resource1Capacity, ok = capacity[v1.ResourceName(resourceName1)] resource1Capacity, ok = capacity[v1.ResourceName(resourceName1)]
as.True(ok) as.True(ok)
resource1Allocatable, ok = allocatable[v1.ResourceName(resourceName1)] resource1Allocatable, ok = allocatable[v1.ResourceName(resourceName1)]
as.True(ok) as.True(ok)
as.Equal(int64(0), resource1Allocatable.Value()) as.Equal(int64(0), resource1Allocatable.Value())
as.Equal(int64(1), resource1Capacity.Value()) as.Equal(int64(2), resource1Capacity.Value())
as.Equal(0, len(removedResources)) as.Equal(0, len(removedResources))
// Tests adding another resource. // Tests adding another resource.
resourceName2 := "resource2" resourceName2 := "resource2"
e2 := &endpointImpl{devices: make(map[string]pluginapi.Device)} e2 := &endpointImpl{}
testManager.endpoints[resourceName2] = e2 testManager.endpoints[resourceName2] = e2
callback(resourceName2, devs, []pluginapi.Device{}, []pluginapi.Device{}) callback(resourceName2, devs)
capacity, allocatable, removedResources = testManager.GetCapacity() capacity, allocatable, removedResources = testManager.GetCapacity()
as.Equal(2, len(capacity)) as.Equal(2, len(capacity))
resource2Capacity, ok := capacity[v1.ResourceName(resourceName2)] resource2Capacity, ok := capacity[v1.ResourceName(resourceName2)]
@ -364,7 +354,7 @@ func TestUpdateCapacityAllocatable(t *testing.T) {
resource2Allocatable, ok := allocatable[v1.ResourceName(resourceName2)] resource2Allocatable, ok := allocatable[v1.ResourceName(resourceName2)]
as.True(ok) as.True(ok)
as.Equal(int64(3), resource2Capacity.Value()) as.Equal(int64(3), resource2Capacity.Value())
as.Equal(int64(2), resource2Allocatable.Value()) as.Equal(int64(1), resource2Allocatable.Value())
as.Equal(0, len(removedResources)) as.Equal(0, len(removedResources))
// Expires resourceName1 endpoint. Verifies testManager.GetCapacity() reports that resourceName1 // Expires resourceName1 endpoint. Verifies testManager.GetCapacity() reports that resourceName1
@ -552,11 +542,7 @@ type MockEndpoint struct {
func (m *MockEndpoint) stop() {} func (m *MockEndpoint) stop() {}
func (m *MockEndpoint) run() {} func (m *MockEndpoint) run() {}
func (m *MockEndpoint) getDevices() []pluginapi.Device { func (m *MockEndpoint) callback(resourceName string, devices []pluginapi.Device) {}
return []pluginapi.Device{}
}
func (m *MockEndpoint) callback(resourceName string, added, updated, deleted []pluginapi.Device) {}
func (m *MockEndpoint) preStartContainer(devs []string) (*pluginapi.PreStartContainerResponse, error) { func (m *MockEndpoint) preStartContainer(devs []string) (*pluginapi.PreStartContainerResponse, error) {
m.initChan <- devs m.initChan <- devs
@ -592,7 +578,7 @@ func makePod(limits v1.ResourceList) *v1.Pod {
} }
func getTestManager(tmpDir string, activePods ActivePodsFunc, testRes []TestResource, opts map[string]*pluginapi.DevicePluginOptions) (*ManagerImpl, error) { func getTestManager(tmpDir string, activePods ActivePodsFunc, testRes []TestResource, opts map[string]*pluginapi.DevicePluginOptions) (*ManagerImpl, error) {
monitorCallback := func(resourceName string, added, updated, deleted []pluginapi.Device) {} monitorCallback := func(resourceName string, devices []pluginapi.Device) {}
ckm, err := checkpointmanager.NewCheckpointManager(tmpDir) ckm, err := checkpointmanager.NewCheckpointManager(tmpDir)
if err != nil { if err != nil {
return nil, err return nil, err
@ -858,7 +844,7 @@ func TestSanitizeNodeAllocatable(t *testing.T) {
devID2 := "dev2" devID2 := "dev2"
as := assert.New(t) as := assert.New(t)
monitorCallback := func(resourceName string, added, updated, deleted []pluginapi.Device) {} monitorCallback := func(resourceName string, devices []pluginapi.Device) {}
tmpDir, err := ioutil.TempDir("", "checkpoint") tmpDir, err := ioutil.TempDir("", "checkpoint")
as.Nil(err) as.Nil(err)

View File

@ -20,7 +20,6 @@ import (
"time" "time"
"k8s.io/api/core/v1" "k8s.io/api/core/v1"
pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1"
"k8s.io/kubernetes/pkg/kubelet/config" "k8s.io/kubernetes/pkg/kubelet/config"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/lifecycle" "k8s.io/kubernetes/pkg/kubelet/lifecycle"
@ -33,11 +32,6 @@ type Manager interface {
// Start starts device plugin registration service. // Start starts device plugin registration service.
Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady) error Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady) error
// Devices is the map of devices that have registered themselves
// against the manager.
// The map key is the ResourceName of the device plugins.
Devices() map[string][]pluginapi.Device
// Allocate configures and assigns devices to pods. The pods are provided // Allocate configures and assigns devices to pods. The pods are provided
// through the pod admission attributes in the attrs argument. From the // through the pod admission attributes in the attrs argument. From the
// requested device resources, Allocate will communicate with the owning // requested device resources, Allocate will communicate with the owning