Merge pull request #92665 from klueska/upstream-add-get-preferred-allocation-api

Add GetPreferredAllocation() call to the v1beta1 device plugin API
This commit is contained in:
Kubernetes Prow Robot 2020-07-03 20:31:16 -07:00 committed by GitHub
commit efb56da4a5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 1557 additions and 206 deletions

View File

@ -33,10 +33,11 @@ import (
// Stub implementation for DevicePlugin.
type Stub struct {
devs []*pluginapi.Device
socket string
resourceName string
preStartContainerFlag bool
devs []*pluginapi.Device
socket string
resourceName string
preStartContainerFlag bool
getPreferredAllocationFlag bool
stop chan interface{}
wg sync.WaitGroup
@ -47,12 +48,24 @@ type Stub struct {
// allocFunc is used for handling allocation request
allocFunc stubAllocFunc
// getPreferredAllocFunc is used for handling getPreferredAllocation request
getPreferredAllocFunc stubGetPreferredAllocFunc
registrationStatus chan watcherapi.RegistrationStatus // for testing
endpoint string // for testing
}
// stubAllocFunc is the function called when receive an allocation request from Kubelet
// stubGetPreferredAllocFunc is the function called when a getPreferredAllocation request is received from Kubelet
type stubGetPreferredAllocFunc func(r *pluginapi.PreferredAllocationRequest, devs map[string]pluginapi.Device) (*pluginapi.PreferredAllocationResponse, error)
func defaultGetPreferredAllocFunc(r *pluginapi.PreferredAllocationRequest, devs map[string]pluginapi.Device) (*pluginapi.PreferredAllocationResponse, error) {
var response pluginapi.PreferredAllocationResponse
return &response, nil
}
// stubAllocFunc is the function called when an allocation request is received from Kubelet
type stubAllocFunc func(r *pluginapi.AllocateRequest, devs map[string]pluginapi.Device) (*pluginapi.AllocateResponse, error)
func defaultAllocFunc(r *pluginapi.AllocateRequest, devs map[string]pluginapi.Device) (*pluginapi.AllocateResponse, error) {
@ -62,20 +75,27 @@ func defaultAllocFunc(r *pluginapi.AllocateRequest, devs map[string]pluginapi.De
}
// NewDevicePluginStub returns an initialized DevicePlugin Stub.
func NewDevicePluginStub(devs []*pluginapi.Device, socket string, name string, preStartContainerFlag bool) *Stub {
func NewDevicePluginStub(devs []*pluginapi.Device, socket string, name string, preStartContainerFlag bool, getPreferredAllocationFlag bool) *Stub {
return &Stub{
devs: devs,
socket: socket,
resourceName: name,
preStartContainerFlag: preStartContainerFlag,
devs: devs,
socket: socket,
resourceName: name,
preStartContainerFlag: preStartContainerFlag,
getPreferredAllocationFlag: getPreferredAllocationFlag,
stop: make(chan interface{}),
update: make(chan []*pluginapi.Device),
allocFunc: defaultAllocFunc,
allocFunc: defaultAllocFunc,
getPreferredAllocFunc: defaultGetPreferredAllocFunc,
}
}
// SetGetPreferredAllocFunc sets allocFunc of the device plugin
func (m *Stub) SetGetPreferredAllocFunc(f stubGetPreferredAllocFunc) {
m.getPreferredAllocFunc = f
}
// SetAllocFunc sets allocFunc of the device plugin
func (m *Stub) SetAllocFunc(f stubAllocFunc) {
m.allocFunc = f
@ -174,7 +194,10 @@ func (m *Stub) Register(kubeletEndpoint, resourceName string, pluginSockDir stri
Version: pluginapi.Version,
Endpoint: path.Base(m.socket),
ResourceName: resourceName,
Options: &pluginapi.DevicePluginOptions{PreStartRequired: m.preStartContainerFlag},
Options: &pluginapi.DevicePluginOptions{
PreStartRequired: m.preStartContainerFlag,
GetPreferredAllocationAvailable: m.getPreferredAllocationFlag,
},
}
_, err = client.Register(context.Background(), reqt)
@ -186,7 +209,11 @@ func (m *Stub) Register(kubeletEndpoint, resourceName string, pluginSockDir stri
// GetDevicePluginOptions returns DevicePluginOptions settings for the device plugin.
func (m *Stub) GetDevicePluginOptions(ctx context.Context, e *pluginapi.Empty) (*pluginapi.DevicePluginOptions, error) {
return &pluginapi.DevicePluginOptions{PreStartRequired: m.preStartContainerFlag}, nil
options := &pluginapi.DevicePluginOptions{
PreStartRequired: m.preStartContainerFlag,
GetPreferredAllocationAvailable: m.getPreferredAllocationFlag,
}
return options, nil
}
// PreStartContainer resets the devices received
@ -216,6 +243,19 @@ func (m *Stub) Update(devs []*pluginapi.Device) {
m.update <- devs
}
// GetPreferredAllocation gets the preferred allocation from a set of available devices
func (m *Stub) GetPreferredAllocation(ctx context.Context, r *pluginapi.PreferredAllocationRequest) (*pluginapi.PreferredAllocationResponse, error) {
klog.Infof("GetPreferredAllocation, %+v", r)
devs := make(map[string]pluginapi.Device)
for _, dev := range m.devs {
devs[dev.ID] = *dev
}
return m.getPreferredAllocFunc(r, devs)
}
// Allocate does a mock allocation
func (m *Stub) Allocate(ctx context.Context, r *pluginapi.AllocateRequest) (*pluginapi.AllocateResponse, error) {
klog.Infof("Allocate, %+v", r)

View File

@ -35,6 +35,7 @@ import (
type endpoint interface {
run()
stop()
getPreferredAllocation(available, mustInclude []string, size int) (*pluginapi.PreferredAllocationResponse, error)
allocate(devs []string) (*pluginapi.AllocateResponse, error)
preStartContainer(devs []string) (*pluginapi.PreStartContainerResponse, error)
callback(resourceName string, devices []pluginapi.Device)
@ -138,6 +139,22 @@ func (e *endpointImpl) setStopTime(t time.Time) {
e.stopTime = t
}
// getPreferredAllocation issues GetPreferredAllocation gRPC call to the device plugin.
func (e *endpointImpl) getPreferredAllocation(available, mustInclude []string, size int) (*pluginapi.PreferredAllocationResponse, error) {
if e.isStopped() {
return nil, fmt.Errorf(errEndpointStopped, e)
}
return e.client.GetPreferredAllocation(context.Background(), &pluginapi.PreferredAllocationRequest{
ContainerRequests: []*pluginapi.ContainerPreferredAllocationRequest{
{
AvailableDeviceIDs: available,
MustIncludeDeviceIDs: mustInclude,
AllocationSize: int32(size),
},
},
})
}
// allocate issues Allocate gRPC call to the device plugin.
func (e *endpointImpl) allocate(devs []string) (*pluginapi.AllocateResponse, error) {
if e.isStopped() {

View File

@ -159,8 +159,42 @@ func TestAllocate(t *testing.T) {
require.Equal(t, resp, respOut)
}
func TestGetPreferredAllocation(t *testing.T) {
socket := path.Join("/tmp", esocketName)
callbackCount := 0
callbackChan := make(chan int)
p, e := esetup(t, []*pluginapi.Device{}, socket, "mock", func(n string, d []pluginapi.Device) {
callbackCount++
callbackChan <- callbackCount
})
defer ecleanup(t, p, e)
resp := &pluginapi.PreferredAllocationResponse{
ContainerResponses: []*pluginapi.ContainerPreferredAllocationResponse{
{DeviceIDs: []string{"device0", "device1", "device2"}},
},
}
p.SetGetPreferredAllocFunc(func(r *pluginapi.PreferredAllocationRequest, devs map[string]pluginapi.Device) (*pluginapi.PreferredAllocationResponse, error) {
return resp, nil
})
go e.run()
// Wait for the callback to be issued.
select {
case <-callbackChan:
break
case <-time.After(time.Second):
t.FailNow()
}
respOut, err := e.getPreferredAllocation([]string{}, []string{}, -1)
require.NoError(t, err)
require.Equal(t, resp, respOut)
}
func esetup(t *testing.T, devs []*pluginapi.Device, socket, resourceName string, callback monitorCallback) (*Stub, *endpointImpl) {
p := NewDevicePluginStub(devs, socket, resourceName, false)
p := NewDevicePluginStub(devs, socket, resourceName, false, false)
err := p.Start()
require.NoError(t, err)

View File

@ -43,7 +43,6 @@ import (
cputopology "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/topology"
"k8s.io/kubernetes/pkg/kubelet/cm/devicemanager/checkpoint"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager/bitmask"
"k8s.io/kubernetes/pkg/kubelet/config"
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
"k8s.io/kubernetes/pkg/kubelet/metrics"
@ -658,49 +657,107 @@ func (m *ManagerImpl) devicesToAllocate(podUID, contName, resource string, requi
return nil, nil
}
klog.V(3).Infof("Needs to allocate %d %q for pod %q container %q", needed, resource, podUID, contName)
// Needs to allocate additional devices.
// Check if resource registered with devicemanager
if _, ok := m.healthyDevices[resource]; !ok {
return nil, fmt.Errorf("can't allocate unregistered device %s", resource)
}
devices = sets.NewString()
// Allocates from reusableDevices list first.
for device := range reusableDevices {
devices.Insert(device)
needed--
if needed == 0 {
return devices, nil
// Declare the list of allocated devices.
// This will be populated and returned below.
allocated := sets.NewString()
// Create a closure to help with device allocation
// Returns 'true' once no more devices need to be allocated.
allocateRemainingFrom := func(devices sets.String) bool {
for device := range devices.Difference(allocated) {
m.allocatedDevices[resource].Insert(device)
allocated.Insert(device)
needed--
if needed == 0 {
return true
}
}
return false
}
// Allocates from reusableDevices list first.
if allocateRemainingFrom(reusableDevices) {
return allocated, nil
}
// Needs to allocate additional devices.
if m.allocatedDevices[resource] == nil {
m.allocatedDevices[resource] = sets.NewString()
}
// Gets Devices in use.
devicesInUse := m.allocatedDevices[resource]
// Gets a list of available devices.
// Gets Available devices.
available := m.healthyDevices[resource].Difference(devicesInUse)
if available.Len() < needed {
return nil, fmt.Errorf("requested number of devices unavailable for %s. Requested: %d, Available: %d", resource, needed, available.Len())
}
// By default, pull devices from the unsorted list of available devices.
allocated := available.UnsortedList()[:needed]
// If topology alignment is desired, update allocated to the set of devices
// with the best alignment.
hint := m.topologyAffinityStore.GetAffinity(podUID, contName)
if m.deviceHasTopologyAlignment(resource) && hint.NUMANodeAffinity != nil {
allocated = m.takeByTopology(resource, available, hint.NUMANodeAffinity, needed)
// Filters available Devices based on NUMA affinity.
aligned, unaligned, noAffinity := m.filterByAffinity(podUID, contName, resource, available)
// If we can allocate all remaining devices from the set of aligned ones, then
// give the plugin the chance to influence which ones to allocate from that set.
if needed < aligned.Len() {
// First allocate from the preferred devices list (if available).
preferred, err := m.callGetPreferredAllocationIfAvailable(podUID, contName, resource, aligned.Union(allocated), allocated, required)
if err != nil {
return nil, err
}
if allocateRemainingFrom(preferred.Intersection(aligned.Union(allocated))) {
return allocated, nil
}
// Then fallback to allocate from the aligned set if no preferred list
// is returned (or not enough devices are returned in that list).
if allocateRemainingFrom(aligned) {
return allocated, nil
}
return nil, fmt.Errorf("unexpectedly allocated less resources than required. Requested: %d, Got: %d", required, required-needed)
}
// Updates m.allocatedDevices with allocated devices to prevent them
// from being allocated to other pods/containers, given that we are
// not holding lock during the rpc call.
for _, device := range allocated {
m.allocatedDevices[resource].Insert(device)
devices.Insert(device)
// If we can't allocate all remaining devices from the set of aligned ones,
// then start by first allocating all of the aligned devices (to ensure
// that the alignment guaranteed by the TopologyManager is honored).
if allocateRemainingFrom(aligned) {
return allocated, nil
}
return devices, nil
// Then give the plugin the chance to influence the decision on any
// remaining devices to allocate.
preferred, err := m.callGetPreferredAllocationIfAvailable(podUID, contName, resource, available.Union(devices), devices, required)
if err != nil {
return nil, err
}
if allocateRemainingFrom(preferred.Intersection(available.Union(allocated))) {
return allocated, nil
}
// Finally, if the plugin did not return a preferred allocation (or didn't
// return a large enough one), then fall back to allocating the remaining
// devices from the 'unaligned' and 'noAffinity' sets.
if allocateRemainingFrom(unaligned) {
return allocated, nil
}
if allocateRemainingFrom(noAffinity) {
return allocated, nil
}
return nil, fmt.Errorf("unexpectedly allocated less resources than required. Requested: %d, Got: %d", required, required-needed)
}
func (m *ManagerImpl) takeByTopology(resource string, available sets.String, affinity bitmask.BitMask, request int) []string {
func (m *ManagerImpl) filterByAffinity(podUID, contName, resource string, available sets.String) (sets.String, sets.String, sets.String) {
// If alignment information is not available, just pass the available list back.
hint := m.topologyAffinityStore.GetAffinity(podUID, contName)
if !m.deviceHasTopologyAlignment(resource) || hint.NUMANodeAffinity == nil {
return sets.NewString(), sets.NewString(), available
}
// Build a map of NUMA Nodes to the devices associated with them. A
// device may be associated to multiple NUMA nodes at the same time. If an
// available device does not have any NUMA Nodes associated with it, add it
@ -754,7 +811,7 @@ func (m *ManagerImpl) takeByTopology(resource string, available sets.String, aff
if perNodeDevices[n].Has(d) {
if n == nodeWithoutTopology {
withoutTopology = append(withoutTopology, d)
} else if affinity.IsSet(n) {
} else if hint.NUMANodeAffinity.IsSet(n) {
fromAffinity = append(fromAffinity, d)
} else {
notFromAffinity = append(notFromAffinity, d)
@ -764,8 +821,8 @@ func (m *ManagerImpl) takeByTopology(resource string, available sets.String, aff
}
}
// Concatenate the lists above return the first 'request' devices from it..
return append(append(fromAffinity, notFromAffinity...), withoutTopology...)[:request]
// Return all three lists containing the full set of devices across them.
return sets.NewString(fromAffinity...), sets.NewString(notFromAffinity...), sets.NewString(withoutTopology...)
}
// allocateContainerResources attempts to allocate all of required device
@ -920,6 +977,30 @@ func (m *ManagerImpl) callPreStartContainerIfNeeded(podUID, contName, resource s
return nil
}
// callGetPreferredAllocationIfAvailable issues GetPreferredAllocation grpc
// call for device plugin resource with GetPreferredAllocationAvailable option set.
func (m *ManagerImpl) callGetPreferredAllocationIfAvailable(podUID, contName, resource string, available, mustInclude sets.String, size int) (sets.String, error) {
eI, ok := m.endpoints[resource]
if !ok {
return nil, fmt.Errorf("endpoint not found in cache for a registered resource: %s", resource)
}
if eI.opts == nil || !eI.opts.GetPreferredAllocationAvailable {
klog.V(4).Infof("Plugin options indicate to skip GetPreferredAllocation for resource: %s", resource)
return nil, nil
}
m.mutex.Unlock()
klog.V(4).Infof("Issuing a GetPreferredAllocation call for container, %s, of pod %s", contName, podUID)
resp, err := eI.e.getPreferredAllocation(available.UnsortedList(), mustInclude.UnsortedList(), size)
m.mutex.Lock()
if err != nil {
return nil, fmt.Errorf("device plugin GetPreferredAllocation rpc failed with err: %v", err)
}
// TODO: Add metrics support for init RPC
return sets.NewString(resp.ContainerResponses[0].DeviceIDs...), nil
}
// sanitizeNodeAllocatable scans through allocatedDevices in the device manager
// and if necessary, updates allocatableResource in nodeInfo to at least equal to
// the allocated capacity. This allows pods that have already been scheduled on

View File

@ -103,55 +103,57 @@ func TestDevicePluginReRegistration(t *testing.T) {
{ID: "Dev3", Health: pluginapi.Healthy},
}
for _, preStartContainerFlag := range []bool{false, true} {
m, ch, p1 := setup(t, devs, nil, socketName, pluginSocketName)
p1.Register(socketName, testResourceName, "")
for _, getPreferredAllocationFlag := range []bool{false, true} {
m, ch, p1 := setup(t, devs, nil, socketName, pluginSocketName)
p1.Register(socketName, testResourceName, "")
select {
case <-ch:
case <-time.After(5 * time.Second):
t.Fatalf("timeout while waiting for manager update")
select {
case <-ch:
case <-time.After(5 * time.Second):
t.Fatalf("timeout while waiting for manager update")
}
capacity, allocatable, _ := m.GetCapacity()
resourceCapacity := capacity[v1.ResourceName(testResourceName)]
resourceAllocatable := allocatable[v1.ResourceName(testResourceName)]
require.Equal(t, resourceCapacity.Value(), resourceAllocatable.Value(), "capacity should equal to allocatable")
require.Equal(t, int64(2), resourceAllocatable.Value(), "Devices are not updated.")
p2 := NewDevicePluginStub(devs, pluginSocketName+".new", testResourceName, preStartContainerFlag, getPreferredAllocationFlag)
err = p2.Start()
require.NoError(t, err)
p2.Register(socketName, testResourceName, "")
select {
case <-ch:
case <-time.After(5 * time.Second):
t.Fatalf("timeout while waiting for manager update")
}
capacity, allocatable, _ = m.GetCapacity()
resourceCapacity = capacity[v1.ResourceName(testResourceName)]
resourceAllocatable = allocatable[v1.ResourceName(testResourceName)]
require.Equal(t, resourceCapacity.Value(), resourceAllocatable.Value(), "capacity should equal to allocatable")
require.Equal(t, int64(2), resourceAllocatable.Value(), "Devices shouldn't change.")
// Test the scenario that a plugin re-registers with different devices.
p3 := NewDevicePluginStub(devsForRegistration, pluginSocketName+".third", testResourceName, preStartContainerFlag, getPreferredAllocationFlag)
err = p3.Start()
require.NoError(t, err)
p3.Register(socketName, testResourceName, "")
select {
case <-ch:
case <-time.After(5 * time.Second):
t.Fatalf("timeout while waiting for manager update")
}
capacity, allocatable, _ = m.GetCapacity()
resourceCapacity = capacity[v1.ResourceName(testResourceName)]
resourceAllocatable = allocatable[v1.ResourceName(testResourceName)]
require.Equal(t, resourceCapacity.Value(), resourceAllocatable.Value(), "capacity should equal to allocatable")
require.Equal(t, int64(1), resourceAllocatable.Value(), "Devices of plugin previously registered should be removed.")
p2.Stop()
p3.Stop()
cleanup(t, m, p1)
}
capacity, allocatable, _ := m.GetCapacity()
resourceCapacity := capacity[v1.ResourceName(testResourceName)]
resourceAllocatable := allocatable[v1.ResourceName(testResourceName)]
require.Equal(t, resourceCapacity.Value(), resourceAllocatable.Value(), "capacity should equal to allocatable")
require.Equal(t, int64(2), resourceAllocatable.Value(), "Devices are not updated.")
p2 := NewDevicePluginStub(devs, pluginSocketName+".new", testResourceName, preStartContainerFlag)
err = p2.Start()
require.NoError(t, err)
p2.Register(socketName, testResourceName, "")
select {
case <-ch:
case <-time.After(5 * time.Second):
t.Fatalf("timeout while waiting for manager update")
}
capacity, allocatable, _ = m.GetCapacity()
resourceCapacity = capacity[v1.ResourceName(testResourceName)]
resourceAllocatable = allocatable[v1.ResourceName(testResourceName)]
require.Equal(t, resourceCapacity.Value(), resourceAllocatable.Value(), "capacity should equal to allocatable")
require.Equal(t, int64(2), resourceAllocatable.Value(), "Devices shouldn't change.")
// Test the scenario that a plugin re-registers with different devices.
p3 := NewDevicePluginStub(devsForRegistration, pluginSocketName+".third", testResourceName, preStartContainerFlag)
err = p3.Start()
require.NoError(t, err)
p3.Register(socketName, testResourceName, "")
select {
case <-ch:
case <-time.After(5 * time.Second):
t.Fatalf("timeout while waiting for manager update")
}
capacity, allocatable, _ = m.GetCapacity()
resourceCapacity = capacity[v1.ResourceName(testResourceName)]
resourceAllocatable = allocatable[v1.ResourceName(testResourceName)]
require.Equal(t, resourceCapacity.Value(), resourceAllocatable.Value(), "capacity should equal to allocatable")
require.Equal(t, int64(1), resourceAllocatable.Value(), "Devices of plugin previously registered should be removed.")
p2.Stop()
p3.Stop()
cleanup(t, m, p1)
}
}
@ -186,7 +188,7 @@ func TestDevicePluginReRegistrationProbeMode(t *testing.T) {
require.Equal(t, resourceCapacity.Value(), resourceAllocatable.Value(), "capacity should equal to allocatable")
require.Equal(t, int64(2), resourceAllocatable.Value(), "Devices are not updated.")
p2 := NewDevicePluginStub(devs, pluginSocketName+".new", testResourceName, false)
p2 := NewDevicePluginStub(devs, pluginSocketName+".new", testResourceName, false, false)
err = p2.Start()
require.NoError(t, err)
// Wait for the second callback to be issued.
@ -203,7 +205,7 @@ func TestDevicePluginReRegistrationProbeMode(t *testing.T) {
require.Equal(t, int64(2), resourceAllocatable.Value(), "Devices are not updated.")
// Test the scenario that a plugin re-registers with different devices.
p3 := NewDevicePluginStub(devsForRegistration, pluginSocketName+".third", testResourceName, false)
p3 := NewDevicePluginStub(devsForRegistration, pluginSocketName+".third", testResourceName, false, false)
err = p3.Start()
require.NoError(t, err)
// Wait for the third callback to be issued.
@ -249,7 +251,7 @@ func setupDeviceManager(t *testing.T, devs []*pluginapi.Device, callback monitor
}
func setupDevicePlugin(t *testing.T, devs []*pluginapi.Device, pluginSocketName string) *Stub {
p := NewDevicePluginStub(devs, pluginSocketName, testResourceName, false)
p := NewDevicePluginStub(devs, pluginSocketName, testResourceName, false, false)
err := p.Start()
require.NoError(t, err)
return p
@ -549,8 +551,9 @@ func (a *activePodsStub) updateActivePods(newPods []*v1.Pod) {
}
type MockEndpoint struct {
allocateFunc func(devs []string) (*pluginapi.AllocateResponse, error)
initChan chan []string
getPreferredAllocationFunc func(available, mustInclude []string, size int) (*pluginapi.PreferredAllocationResponse, error)
allocateFunc func(devs []string) (*pluginapi.AllocateResponse, error)
initChan chan []string
}
func (m *MockEndpoint) stop() {}
@ -563,6 +566,13 @@ func (m *MockEndpoint) preStartContainer(devs []string) (*pluginapi.PreStartCont
return &pluginapi.PreStartContainerResponse{}, nil
}
func (m *MockEndpoint) getPreferredAllocation(available, mustInclude []string, size int) (*pluginapi.PreferredAllocationResponse, error) {
if m.getPreferredAllocationFunc != nil {
return m.getPreferredAllocationFunc(available, mustInclude, size)
}
return nil, nil
}
func (m *MockEndpoint) allocate(devs []string) (*pluginapi.AllocateResponse, error) {
if m.allocateFunc != nil {
return m.allocateFunc(devs)

View File

@ -17,6 +17,7 @@ limitations under the License.
package devicemanager
import (
"fmt"
"reflect"
"sort"
"testing"
@ -431,14 +432,15 @@ func TestGetTopologyHints(t *testing.T) {
func TestTopologyAlignedAllocation(t *testing.T) {
tcases := []struct {
description string
resource string
request int
devices []pluginapi.Device
allocatedDevices []string
hint topologymanager.TopologyHint
expectedAllocation int
expectedAlignment map[int]int
description string
resource string
request int
devices []pluginapi.Device
allocatedDevices []string
hint topologymanager.TopologyHint
getPreferredAllocationFunc func(available, mustInclude []string, size int) (*pluginapi.PreferredAllocationResponse, error)
expectedPreferredAllocation []string
expectedAlignment map[int]int
}{
{
description: "Single Request, no alignment",
@ -452,8 +454,7 @@ func TestTopologyAlignedAllocation(t *testing.T) {
NUMANodeAffinity: makeSocketMask(0, 1),
Preferred: true,
},
expectedAllocation: 1,
expectedAlignment: map[int]int{},
expectedAlignment: map[int]int{},
},
{
description: "Request for 1, partial alignment",
@ -467,8 +468,7 @@ func TestTopologyAlignedAllocation(t *testing.T) {
NUMANodeAffinity: makeSocketMask(1),
Preferred: true,
},
expectedAllocation: 1,
expectedAlignment: map[int]int{1: 1},
expectedAlignment: map[int]int{1: 1},
},
{
description: "Single Request, socket 0",
@ -482,8 +482,7 @@ func TestTopologyAlignedAllocation(t *testing.T) {
NUMANodeAffinity: makeSocketMask(0),
Preferred: true,
},
expectedAllocation: 1,
expectedAlignment: map[int]int{0: 1},
expectedAlignment: map[int]int{0: 1},
},
{
description: "Single Request, socket 1",
@ -497,8 +496,7 @@ func TestTopologyAlignedAllocation(t *testing.T) {
NUMANodeAffinity: makeSocketMask(1),
Preferred: true,
},
expectedAllocation: 1,
expectedAlignment: map[int]int{1: 1},
expectedAlignment: map[int]int{1: 1},
},
{
description: "Request for 2, socket 0",
@ -514,8 +512,7 @@ func TestTopologyAlignedAllocation(t *testing.T) {
NUMANodeAffinity: makeSocketMask(0),
Preferred: true,
},
expectedAllocation: 2,
expectedAlignment: map[int]int{0: 2},
expectedAlignment: map[int]int{0: 2},
},
{
description: "Request for 2, socket 1",
@ -531,8 +528,7 @@ func TestTopologyAlignedAllocation(t *testing.T) {
NUMANodeAffinity: makeSocketMask(1),
Preferred: true,
},
expectedAllocation: 2,
expectedAlignment: map[int]int{1: 2},
expectedAlignment: map[int]int{1: 2},
},
{
description: "Request for 4, unsatisfiable, prefer socket 0",
@ -550,8 +546,7 @@ func TestTopologyAlignedAllocation(t *testing.T) {
NUMANodeAffinity: makeSocketMask(0),
Preferred: true,
},
expectedAllocation: 4,
expectedAlignment: map[int]int{0: 3, 1: 1},
expectedAlignment: map[int]int{0: 3, 1: 1},
},
{
description: "Request for 4, unsatisfiable, prefer socket 1",
@ -569,8 +564,7 @@ func TestTopologyAlignedAllocation(t *testing.T) {
NUMANodeAffinity: makeSocketMask(1),
Preferred: true,
},
expectedAllocation: 4,
expectedAlignment: map[int]int{0: 1, 1: 3},
expectedAlignment: map[int]int{0: 1, 1: 3},
},
{
description: "Request for 4, multisocket",
@ -590,8 +584,153 @@ func TestTopologyAlignedAllocation(t *testing.T) {
NUMANodeAffinity: makeSocketMask(1, 3),
Preferred: true,
},
expectedAllocation: 4,
expectedAlignment: map[int]int{1: 2, 3: 2},
expectedAlignment: map[int]int{1: 2, 3: 2},
},
{
description: "Request for 5, socket 0, preferred aligned accepted",
resource: "resource",
request: 5,
devices: func() []pluginapi.Device {
devices := []pluginapi.Device{}
for i := 0; i < 100; i++ {
id := fmt.Sprintf("Dev%d", i)
devices = append(devices, makeNUMADevice(id, 0))
}
for i := 100; i < 200; i++ {
id := fmt.Sprintf("Dev%d", i)
devices = append(devices, makeNUMADevice(id, 1))
}
return devices
}(),
hint: topologymanager.TopologyHint{
NUMANodeAffinity: makeSocketMask(0),
Preferred: true,
},
getPreferredAllocationFunc: func(available, mustInclude []string, size int) (*pluginapi.PreferredAllocationResponse, error) {
return &pluginapi.PreferredAllocationResponse{
ContainerResponses: []*pluginapi.ContainerPreferredAllocationResponse{
{DeviceIDs: []string{"Dev0", "Dev19", "Dev83", "Dev42", "Dev77"}},
},
}, nil
},
expectedPreferredAllocation: []string{"Dev0", "Dev19", "Dev83", "Dev42", "Dev77"},
expectedAlignment: map[int]int{0: 5},
},
{
description: "Request for 5, socket 0, preferred aligned accepted, unaligned ignored",
resource: "resource",
request: 5,
devices: func() []pluginapi.Device {
devices := []pluginapi.Device{}
for i := 0; i < 100; i++ {
id := fmt.Sprintf("Dev%d", i)
devices = append(devices, makeNUMADevice(id, 0))
}
for i := 100; i < 200; i++ {
id := fmt.Sprintf("Dev%d", i)
devices = append(devices, makeNUMADevice(id, 1))
}
return devices
}(),
hint: topologymanager.TopologyHint{
NUMANodeAffinity: makeSocketMask(0),
Preferred: true,
},
getPreferredAllocationFunc: func(available, mustInclude []string, size int) (*pluginapi.PreferredAllocationResponse, error) {
return &pluginapi.PreferredAllocationResponse{
ContainerResponses: []*pluginapi.ContainerPreferredAllocationResponse{
{DeviceIDs: []string{"Dev0", "Dev19", "Dev83", "Dev150", "Dev186"}},
},
}, nil
},
expectedPreferredAllocation: []string{"Dev0", "Dev19", "Dev83"},
expectedAlignment: map[int]int{0: 5},
},
{
description: "Request for 5, socket 1, preferred aligned accepted, bogus ignored",
resource: "resource",
request: 5,
devices: func() []pluginapi.Device {
devices := []pluginapi.Device{}
for i := 0; i < 100; i++ {
id := fmt.Sprintf("Dev%d", i)
devices = append(devices, makeNUMADevice(id, 1))
}
return devices
}(),
hint: topologymanager.TopologyHint{
NUMANodeAffinity: makeSocketMask(1),
Preferred: true,
},
getPreferredAllocationFunc: func(available, mustInclude []string, size int) (*pluginapi.PreferredAllocationResponse, error) {
return &pluginapi.PreferredAllocationResponse{
ContainerResponses: []*pluginapi.ContainerPreferredAllocationResponse{
{DeviceIDs: []string{"Dev0", "Dev19", "Dev83", "bogus0", "bogus1"}},
},
}, nil
},
expectedPreferredAllocation: []string{"Dev0", "Dev19", "Dev83"},
expectedAlignment: map[int]int{1: 5},
},
{
description: "Request for 5, multisocket, preferred accepted",
resource: "resource",
request: 5,
devices: func() []pluginapi.Device {
devices := []pluginapi.Device{}
for i := 0; i < 3; i++ {
id := fmt.Sprintf("Dev%d", i)
devices = append(devices, makeNUMADevice(id, 0))
}
for i := 3; i < 100; i++ {
id := fmt.Sprintf("Dev%d", i)
devices = append(devices, makeNUMADevice(id, 1))
}
return devices
}(),
hint: topologymanager.TopologyHint{
NUMANodeAffinity: makeSocketMask(0),
Preferred: true,
},
getPreferredAllocationFunc: func(available, mustInclude []string, size int) (*pluginapi.PreferredAllocationResponse, error) {
return &pluginapi.PreferredAllocationResponse{
ContainerResponses: []*pluginapi.ContainerPreferredAllocationResponse{
{DeviceIDs: []string{"Dev0", "Dev1", "Dev2", "Dev42", "Dev83"}},
},
}, nil
},
expectedPreferredAllocation: []string{"Dev0", "Dev1", "Dev2", "Dev42", "Dev83"},
expectedAlignment: map[int]int{0: 3, 1: 2},
},
{
description: "Request for 5, multisocket, preferred unaligned accepted, bogus ignored",
resource: "resource",
request: 5,
devices: func() []pluginapi.Device {
devices := []pluginapi.Device{}
for i := 0; i < 3; i++ {
id := fmt.Sprintf("Dev%d", i)
devices = append(devices, makeNUMADevice(id, 0))
}
for i := 3; i < 100; i++ {
id := fmt.Sprintf("Dev%d", i)
devices = append(devices, makeNUMADevice(id, 1))
}
return devices
}(),
hint: topologymanager.TopologyHint{
NUMANodeAffinity: makeSocketMask(0),
Preferred: true,
},
getPreferredAllocationFunc: func(available, mustInclude []string, size int) (*pluginapi.PreferredAllocationResponse, error) {
return &pluginapi.PreferredAllocationResponse{
ContainerResponses: []*pluginapi.ContainerPreferredAllocationResponse{
{DeviceIDs: []string{"Dev0", "Dev1", "Dev2", "Dev42", "bogus0"}},
},
}, nil
},
expectedPreferredAllocation: []string{"Dev0", "Dev1", "Dev2", "Dev42"},
expectedAlignment: map[int]int{0: 3, 1: 2},
},
}
for _, tc := range tcases {
@ -599,6 +738,7 @@ func TestTopologyAlignedAllocation(t *testing.T) {
allDevices: make(map[string]map[string]pluginapi.Device),
healthyDevices: make(map[string]sets.String),
allocatedDevices: make(map[string]sets.String),
endpoints: make(map[string]endpointInfo),
podDevices: make(podDevices),
sourcesReady: &sourcesReadyStub{},
activePods: func() []*v1.Pod { return []*v1.Pod{} },
@ -607,20 +747,34 @@ func TestTopologyAlignedAllocation(t *testing.T) {
m.allDevices[tc.resource] = make(map[string]pluginapi.Device)
m.healthyDevices[tc.resource] = sets.NewString()
m.endpoints[tc.resource] = endpointInfo{}
for _, d := range tc.devices {
m.allDevices[tc.resource][d.ID] = d
m.healthyDevices[tc.resource].Insert(d.ID)
}
if tc.getPreferredAllocationFunc != nil {
m.endpoints[tc.resource] = endpointInfo{
e: &MockEndpoint{
getPreferredAllocationFunc: tc.getPreferredAllocationFunc,
},
opts: &pluginapi.DevicePluginOptions{GetPreferredAllocationAvailable: true},
}
}
allocated, err := m.devicesToAllocate("podUID", "containerName", tc.resource, tc.request, sets.NewString())
if err != nil {
t.Errorf("Unexpected error: %v", err)
continue
}
if len(allocated) != tc.expectedAllocation {
t.Errorf("%v. expected allocation: %v but got: %v", tc.description, tc.expectedAllocation, len(allocated))
if len(allocated) != tc.request {
t.Errorf("%v. expected allocation size: %v but got: %v", tc.description, tc.request, len(allocated))
}
if !allocated.HasAll(tc.expectedPreferredAllocation...) {
t.Errorf("%v. expected preferred allocation: %v but not present in: %v", tc.description, tc.expectedPreferredAllocation, allocated.UnsortedList())
}
alignment := make(map[int]int)

View File

@ -25,8 +25,10 @@ service Registration {
}
message DevicePluginOptions {
// Indicates if PreStartContainer call is required before each container start
// Indicates if PreStartContainer call is required before each container start
bool pre_start_required = 1;
// Indicates if GetPreferredAllocation is implemented and available for calling
bool get_preferred_allocation_available = 2;
}
message RegisterRequest {
@ -37,8 +39,8 @@ message RegisterRequest {
string endpoint = 2;
// Schedulable resource name. As of now it's expected to be a DNS Label
string resource_name = 3;
// Options to be communicated with Device Manager
DevicePluginOptions options = 4;
// Options to be communicated with Device Manager
DevicePluginOptions options = 4;
}
message Empty {
@ -47,7 +49,7 @@ message Empty {
// DevicePlugin is the service advertised by Device Plugins
service DevicePlugin {
// GetDevicePluginOptions returns options to be communicated with Device
// Manager
// Manager
rpc GetDevicePluginOptions(Empty) returns (DevicePluginOptions) {}
// ListAndWatch returns a stream of List of Devices
@ -55,14 +57,21 @@ service DevicePlugin {
// returns the new list
rpc ListAndWatch(Empty) returns (stream ListAndWatchResponse) {}
// GetPreferredAllocation returns a preferred set of devices to allocate
// from a list of available ones. The resulting preferred allocation is not
// guaranteed to be the allocation ultimately performed by the
// devicemanager. It is only designed to help the devicemanager make a more
// informed allocation decision when possible.
rpc GetPreferredAllocation(PreferredAllocationRequest) returns (PreferredAllocationResponse) {}
// Allocate is called during container creation so that the Device
// Plugin can run device specific operations and instruct Kubelet
// of the steps to make the Device available in the container
rpc Allocate(AllocateRequest) returns (AllocateResponse) {}
// PreStartContainer is called, if indicated by Device Plugin during registeration phase,
// before each container start. Device plugin can run device specific operations
// such as resetting the device before making devices available to the container
// PreStartContainer is called, if indicated by Device Plugin during registeration phase,
// before each container start. Device plugin can run device specific operations
// such as resetting the device before making devices available to the container
rpc PreStartContainer(PreStartContainerRequest) returns (PreStartContainerResponse) {}
}
@ -78,7 +87,7 @@ message TopologyInfo {
}
message NUMANode {
int64 ID = 1;
int64 ID = 1;
}
/* E.g:
@ -87,7 +96,7 @@ message NUMANode {
* Health: "Healthy",
* Topology:
* Node:
ID: 1
* ID: 1
*} */
message Device {
// A unique ID assigned by the device plugin used
@ -105,13 +114,41 @@ message Device {
// - PreStartContainer allows Device Plugin to run device specific operations on
// the Devices requested
message PreStartContainerRequest {
repeated string devicesIDs = 1;
repeated string devicesIDs = 1;
}
// PreStartContainerResponse will be send by plugin in response to PreStartContainerRequest
message PreStartContainerResponse {
}
// PreferredAllocationRequest is passed via a call to GetPreferredAllocation()
// at pod admission time. The device plugin should take the list of
// `available_deviceIDs` and calculate a preferred allocation of size
// 'allocation_size' from them, making sure to include the set of devices
// listed in 'must_include_deviceIDs'.
message PreferredAllocationRequest {
repeated ContainerPreferredAllocationRequest container_requests = 1;
}
message ContainerPreferredAllocationRequest {
// List of available deviceIDs from which to choose a preferred allocation
repeated string available_deviceIDs = 1;
// List of deviceIDs that must be included in the preferred allocation
repeated string must_include_deviceIDs = 2;
// Number of devices to include in the preferred allocation
int32 allocation_size = 3;
}
// PreferredAllocationResponse returns a preferred allocation,
// resulting from a PreferredAllocationRequest.
message PreferredAllocationResponse {
repeated ContainerPreferredAllocationResponse container_responses = 1;
}
message ContainerPreferredAllocationResponse {
repeated string deviceIDs = 1;
}
// - Allocate is expected to be called during pod creation since allocation
// failures for any container would result in pod startup failure.
// - Allocate allows kubelet to exposes additional artifacts in a pod's
@ -162,13 +199,13 @@ message Mount {
// DeviceSpec specifies a host device to mount into a container.
message DeviceSpec {
// Path of the device within the container.
string container_path = 1;
// Path of the device on the host.
string host_path = 2;
// Cgroups permissions of the device, candidates are one or more of
// * r - allows container to read from the specified device.
// * w - allows container to write to the specified device.
// * m - allows container to create device files that do not yet exist.
string permissions = 3;
// Path of the device within the container.
string container_path = 1;
// Path of the device on the host.
string host_path = 2;
// Cgroups permissions of the device, candidates are one or more of
// * r - allows container to read from the specified device.
// * w - allows container to write to the specified device.
// * m - allows container to create device files that do not yet exist.
string permissions = 3;
}

View File

@ -1 +1 @@
1.1
1.2

View File

@ -86,7 +86,7 @@ func main() {
}
socketPath := pluginSocksDir + "/dp." + fmt.Sprintf("%d", time.Now().Unix())
dp1 := dm.NewDevicePluginStub(devs, socketPath, resourceName, false)
dp1 := dm.NewDevicePluginStub(devs, socketPath, resourceName, false, false)
if err := dp1.Start(); err != nil {
panic(err)