node: podresources: implement GetAllocatableResources API

Extend the podresources API implementing the GetAllocatableResources endpoint,
as specified in the KEPs:

https://github.com/kubernetes/enhancements/tree/master/keps/sig-node/2043-pod-resource-concrete-assigments
https://github.com/kubernetes/enhancements/pull/2404

Signed-off-by: Francesco Romani <fromani@redhat.com>
This commit is contained in:
Francesco Romani 2020-10-28 11:01:43 +01:00
parent 8b79ad6533
commit 6d33354e4c
20 changed files with 380 additions and 36 deletions

View File

@ -70,3 +70,33 @@ func (p *v1PodResourcesServer) List(ctx context.Context, req *v1.ListPodResource
PodResources: podResources,
}, nil
}
// GetAllocatableResources returns information about all the resources known by the server - this more like the capacity, not like the current amount of free resources.
func (p *v1PodResourcesServer) GetAllocatableResources(ctx context.Context, req *v1.AllocatableResourcesRequest) (*v1.AllocatableResourcesResponse, error) {
metrics.PodResourcesEndpointRequestsTotalCount.WithLabelValues("v1").Inc()
allDevices := p.devicesProvider.GetAllocatableDevices()
var respDevs []*v1.ContainerDevices
for resourceName, resourceDevs := range allDevices {
for devID, dev := range resourceDevs {
for _, node := range dev.GetTopology().GetNodes() {
numaNode := node.GetID()
respDevs = append(respDevs, &v1.ContainerDevices{
ResourceName: resourceName,
DeviceIds: []string{devID},
Topology: &v1.TopologyInfo{
Nodes: []*v1.NUMANode{
{ID: numaNode},
},
},
})
}
}
}
return &v1.AllocatableResourcesResponse{
Devices: respDevs,
CpuIds: p.cpusProvider.GetAllocatableCPUs().ToSliceNoSortInt64(),
}, nil
}

View File

@ -25,8 +25,10 @@ import (
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
pluginapi "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1"
podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1"
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
"k8s.io/kubernetes/pkg/kubelet/cm/devicemanager"
)
func TestListPodResourcesV1(t *testing.T) {
@ -138,6 +140,8 @@ func TestListPodResourcesV1(t *testing.T) {
m.On("GetDevices", string(podUID), containerName).Return(tc.devices)
m.On("GetCPUs", string(podUID), containerName).Return(tc.cpus)
m.On("UpdateAllocatedDevices").Return()
m.On("GetAllocatableCPUs").Return(cpuset.CPUSet{})
m.On("GetAllocatableDevices").Return(devicemanager.NewResourceDeviceInstances())
server := NewV1PodResourcesServer(m, m, m)
resp, err := server.List(context.TODO(), &podresourcesapi.ListPodResourcesRequest{})
if err != nil {
@ -150,6 +154,140 @@ func TestListPodResourcesV1(t *testing.T) {
}
}
func TestAllocatableResources(t *testing.T) {
allDevs := devicemanager.ResourceDeviceInstances{
"resource": {
"dev0": {
ID: "GPU-fef8089b-4820-abfc-e83e-94318197576e",
Health: "Healthy",
Topology: &pluginapi.TopologyInfo{
Nodes: []*pluginapi.NUMANode{
{
ID: 0,
},
},
},
},
"dev1": {
ID: "VF-8536e1e8-9dc6-4645-9aea-882db92e31e7",
Health: "Healthy",
Topology: &pluginapi.TopologyInfo{
Nodes: []*pluginapi.NUMANode{
{
ID: 1,
},
},
},
},
},
}
allCPUs := cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16)
for _, tc := range []struct {
desc string
allCPUs cpuset.CPUSet
allDevices devicemanager.ResourceDeviceInstances
expectedAllocatableResourcesResponse *podresourcesapi.AllocatableResourcesResponse
}{
{
desc: "no devices, no CPUs",
allCPUs: cpuset.CPUSet{},
allDevices: devicemanager.NewResourceDeviceInstances(),
expectedAllocatableResourcesResponse: &podresourcesapi.AllocatableResourcesResponse{},
},
{
desc: "no devices, all CPUs",
allCPUs: allCPUs,
allDevices: devicemanager.NewResourceDeviceInstances(),
expectedAllocatableResourcesResponse: &podresourcesapi.AllocatableResourcesResponse{
CpuIds: allCPUs.ToSliceNoSortInt64(),
},
},
{
desc: "with devices, all CPUs",
allCPUs: allCPUs,
allDevices: allDevs,
expectedAllocatableResourcesResponse: &podresourcesapi.AllocatableResourcesResponse{
CpuIds: allCPUs.ToSliceNoSortInt64(),
Devices: []*podresourcesapi.ContainerDevices{
{
ResourceName: "resource",
DeviceIds: []string{"dev0"},
Topology: &podresourcesapi.TopologyInfo{
Nodes: []*podresourcesapi.NUMANode{
{
ID: 0,
},
},
},
},
{
ResourceName: "resource",
DeviceIds: []string{"dev1"},
Topology: &podresourcesapi.TopologyInfo{
Nodes: []*podresourcesapi.NUMANode{
{
ID: 1,
},
},
},
},
},
},
},
{
desc: "with devices, no CPUs",
allCPUs: cpuset.CPUSet{},
allDevices: allDevs,
expectedAllocatableResourcesResponse: &podresourcesapi.AllocatableResourcesResponse{
Devices: []*podresourcesapi.ContainerDevices{
{
ResourceName: "resource",
DeviceIds: []string{"dev0"},
Topology: &podresourcesapi.TopologyInfo{
Nodes: []*podresourcesapi.NUMANode{
{
ID: 0,
},
},
},
},
{
ResourceName: "resource",
DeviceIds: []string{"dev1"},
Topology: &podresourcesapi.TopologyInfo{
Nodes: []*podresourcesapi.NUMANode{
{
ID: 1,
},
},
},
},
},
},
},
} {
t.Run(tc.desc, func(t *testing.T) {
m := new(mockProvider)
m.On("GetDevices", "", "").Return([]*podresourcesapi.ContainerDevices{})
m.On("GetCPUs", "", "").Return(cpuset.CPUSet{})
m.On("UpdateAllocatedDevices").Return()
m.On("GetAllocatableDevices").Return(tc.allDevices)
m.On("GetAllocatableCPUs").Return(tc.allCPUs)
server := NewV1PodResourcesServer(m, m, m)
resp, err := server.GetAllocatableResources(context.TODO(), &podresourcesapi.AllocatableResourcesRequest{})
if err != nil {
t.Errorf("want err = %v, got %q", nil, err)
}
if !equalAllocatableResourcesResponse(tc.expectedAllocatableResourcesResponse, resp) {
t.Errorf("want resp = %s, got %s", tc.expectedAllocatableResourcesResponse.String(), resp.String())
}
})
}
}
func equalListResponse(respA, respB *podresourcesapi.ListPodResourcesResponse) bool {
if len(respA.PodResources) != len(respB.PodResources) {
return false
@ -177,29 +315,52 @@ func equalListResponse(respA, respB *podresourcesapi.ListPodResourcesResponse) b
return false
}
if len(cntA.Devices) != len(cntB.Devices) {
if !equalContainerDevices(cntA.Devices, cntB.Devices) {
return false
}
for kdx := 0; kdx < len(cntA.Devices); kdx++ {
cntDevA := cntA.Devices[kdx]
cntDevB := cntB.Devices[kdx]
if cntDevA.ResourceName != cntDevB.ResourceName {
return false
}
if !equalTopology(cntDevA.Topology, cntDevB.Topology) {
return false
}
if !equalStrings(cntDevA.DeviceIds, cntDevB.DeviceIds) {
return false
}
}
}
}
return true
}
func equalContainerDevices(devA, devB []*podresourcesapi.ContainerDevices) bool {
if len(devA) != len(devB) {
return false
}
// the ordering of container devices in the response is not defined,
// so we need to do a full scan, failing at first mismatch
for idx := 0; idx < len(devA); idx++ {
if !containsContainerDevice(devA[idx], devB) {
return false
}
}
return true
}
func containsContainerDevice(cntDev *podresourcesapi.ContainerDevices, devs []*podresourcesapi.ContainerDevices) bool {
for idx := 0; idx < len(devs); idx++ {
if equalContainerDevice(cntDev, devs[idx]) {
return true
}
}
return false
}
func equalContainerDevice(cntDevA, cntDevB *podresourcesapi.ContainerDevices) bool {
if cntDevA.ResourceName != cntDevB.ResourceName {
return false
}
if !equalTopology(cntDevA.Topology, cntDevB.Topology) {
return false
}
if !equalStrings(cntDevA.DeviceIds, cntDevB.DeviceIds) {
return false
}
return true
}
func equalInt64s(a, b []int64) bool {
if len(a) != len(b) {
return false
@ -231,3 +392,10 @@ func equalTopology(a, b *podresourcesapi.TopologyInfo) bool {
}
return reflect.DeepEqual(a, b)
}
func equalAllocatableResourcesResponse(respA, respB *podresourcesapi.AllocatableResourcesResponse) bool {
if !equalInt64s(respA.CpuIds, respB.CpuIds) {
return false
}
return equalContainerDevices(respA.Devices, respB.Devices)
}

View File

@ -28,6 +28,7 @@ import (
podresourcesv1 "k8s.io/kubelet/pkg/apis/podresources/v1"
"k8s.io/kubelet/pkg/apis/podresources/v1alpha1"
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
"k8s.io/kubernetes/pkg/kubelet/cm/devicemanager"
)
type mockProvider struct {
@ -53,6 +54,16 @@ func (m *mockProvider) UpdateAllocatedDevices() {
m.Called()
}
func (m *mockProvider) GetAllocatableDevices() devicemanager.ResourceDeviceInstances {
args := m.Called()
return args.Get(0).(devicemanager.ResourceDeviceInstances)
}
func (m *mockProvider) GetAllocatableCPUs() cpuset.CPUSet {
args := m.Called()
return args.Get(0).(cpuset.CPUSet)
}
func TestListPodResourcesV1alpha1(t *testing.T) {
podName := "pod-name"
podNamespace := "pod-namespace"

View File

@ -20,12 +20,17 @@ import (
"k8s.io/api/core/v1"
podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1"
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
"k8s.io/kubernetes/pkg/kubelet/cm/devicemanager"
)
// DevicesProvider knows how to provide the devices used by the given container
type DevicesProvider interface {
GetDevices(podUID, containerName string) []*podresourcesapi.ContainerDevices
// UpdateAllocatedDevices frees any Devices that are bound to terminated pods.
UpdateAllocatedDevices()
// GetDevices returns information about the devices assigned to pods and containers
GetDevices(podUID, containerName string) []*podresourcesapi.ContainerDevices
// GetAllocatableDevices returns information about all the devices known to the manager
GetAllocatableDevices() devicemanager.ResourceDeviceInstances
}
// PodsProvider knows how to provide the pods admitted by the node
@ -35,5 +40,8 @@ type PodsProvider interface {
// CPUsProvider knows how to provide the cpus used by the given container
type CPUsProvider interface {
// GetCPUs returns information about the cpus assigned to pods and containers
GetCPUs(podUID, containerName string) cpuset.CPUSet
// GetAllocatableCPUs returns the allocatable (not allocated) CPUs
GetAllocatableCPUs() cpuset.CPUSet
}

View File

@ -26,8 +26,8 @@ import (
// TODO: Migrate kubelet to either use its own internal objects or client library.
v1 "k8s.io/api/core/v1"
internalapi "k8s.io/cri-api/pkg/apis"
podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1"
kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config"
"k8s.io/kubernetes/pkg/kubelet/apis/podresources"
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
"k8s.io/kubernetes/pkg/kubelet/config"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
@ -103,12 +103,6 @@ type ContainerManager interface {
// registration.
GetPluginRegistrationHandler() cache.PluginHandler
// GetDevices returns information about the devices assigned to pods and containers
GetDevices(podUID, containerName string) []*podresourcesapi.ContainerDevices
// GetCPUs returns information about the cpus assigned to pods and containers
GetCPUs(podUID, containerName string) cpuset.CPUSet
// ShouldResetExtendedResourceCapacity returns whether or not the extended resources should be zeroed,
// due to node recreation.
ShouldResetExtendedResourceCapacity() bool
@ -116,8 +110,9 @@ type ContainerManager interface {
// GetAllocateResourcesPodAdmitHandler returns an instance of a PodAdmitHandler responsible for allocating pod resources.
GetAllocateResourcesPodAdmitHandler() lifecycle.PodAdmitHandler
// UpdateAllocatedDevices frees any Devices that are bound to terminated pods.
UpdateAllocatedDevices()
// Implements the podresources Provider API for CPUs and Devices
podresources.CPUsProvider
podresources.DevicesProvider
}
type NodeConfig struct {

View File

@ -1073,10 +1073,18 @@ func (cm *containerManagerImpl) GetDevices(podUID, containerName string) []*podr
return cm.deviceManager.GetDevices(podUID, containerName)
}
func (cm *containerManagerImpl) GetAllocatableDevices() devicemanager.ResourceDeviceInstances {
return cm.deviceManager.GetAllocatableDevices()
}
func (cm *containerManagerImpl) GetCPUs(podUID, containerName string) cpuset.CPUSet {
return cm.cpuManager.GetCPUs(podUID, containerName).Clone()
}
func (cm *containerManagerImpl) GetAllocatableCPUs() cpuset.CPUSet {
return cm.cpuManager.GetAllocatableCPUs()
}
func (cm *containerManagerImpl) ShouldResetExtendedResourceCapacity() bool {
return cm.deviceManager.ShouldResetExtendedResourceCapacity()
}

View File

@ -25,6 +25,7 @@ import (
podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager"
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
"k8s.io/kubernetes/pkg/kubelet/cm/devicemanager"
"k8s.io/kubernetes/pkg/kubelet/cm/memorymanager"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
"k8s.io/kubernetes/pkg/kubelet/config"
@ -131,6 +132,14 @@ func (cm *containerManagerStub) GetCPUs(_, _ string) cpuset.CPUSet {
return cpuset.CPUSet{}
}
func (cm *containerManagerStub) GetAllocatableDevices() devicemanager.ResourceDeviceInstances {
return nil
}
func (cm *containerManagerStub) GetAllocatableCPUs() cpuset.CPUSet {
return cpuset.CPUSet{}
}
func NewStubContainerManager() ContainerManager {
return &containerManagerStub{shouldResetExtendedResourceCapacity: false}
}

View File

@ -36,6 +36,7 @@ import (
kubefeatures "k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/kubelet/cadvisor"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager"
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
"k8s.io/kubernetes/pkg/kubelet/cm/devicemanager"
"k8s.io/kubernetes/pkg/kubelet/cm/memorymanager"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
@ -235,3 +236,11 @@ func (cm *containerManagerImpl) UpdateAllocatedDevices() {
func (cm *containerManagerImpl) GetCPUs(_, _ string) cpuset.CPUSet {
return cpuset.CPUSet{}
}
func (cm *containerManagerImpl) GetAllocatableCPUs() cpuset.CPUSet {
return cpuset.CPUSet{}
}
func (cm *containerManagerImpl) GetAllocatableDevices() devicemanager.ResourceDeviceInstances {
return nil
}

View File

@ -85,6 +85,9 @@ type Manager interface {
// and is consulted to achieve NUMA aware resource alignment per Pod
// among this and other resource controllers.
GetPodTopologyHints(pod *v1.Pod) map[string][]topologymanager.TopologyHint
// GetAllocatableCPUs returns the assignable (not allocated) CPUs
GetAllocatableCPUs() cpuset.CPUSet
}
type manager struct {
@ -124,6 +127,9 @@ type manager struct {
// stateFileDirectory holds the directory where the state file for checkpoints is held.
stateFileDirectory string
// allocatableCPUs is the set of online CPUs as reported by the system
allocatableCPUs cpuset.CPUSet
}
var _ Manager = &manager{}
@ -150,6 +156,7 @@ func NewManager(cpuPolicyName string, reconcilePeriod time.Duration, machineInfo
return nil, err
}
klog.Infof("[cpumanager] detected CPU topology: %v", topo)
reservedCPUs, ok := nodeAllocatableReservation[v1.ResourceCPU]
if !ok {
// The static policy cannot initialize without this information.
@ -210,6 +217,8 @@ func (m *manager) Start(activePods ActivePodsFunc, sourcesReady config.SourcesRe
return err
}
m.allocatableCPUs = m.policy.GetAllocatableCPUs(m.state)
if m.policy.Name() == string(PolicyNone) {
return nil
}
@ -296,6 +305,10 @@ func (m *manager) GetPodTopologyHints(pod *v1.Pod) map[string][]topologymanager.
return m.policy.GetPodTopologyHints(m.state, pod)
}
func (m *manager) GetAllocatableCPUs() cpuset.CPUSet {
return m.allocatableCPUs.Clone()
}
type reconciledContainer struct {
podName string
containerName string

View File

@ -120,6 +120,10 @@ func (p *mockPolicy) GetPodTopologyHints(s state.State, pod *v1.Pod) map[string]
return nil
}
func (p *mockPolicy) GetAllocatableCPUs(m state.State) cpuset.CPUSet {
return cpuset.NewCPUSet()
}
type mockRuntimeService struct {
err error
}

View File

@ -74,6 +74,11 @@ func (m *fakeManager) GetCPUs(podUID, containerName string) cpuset.CPUSet {
return cpuset.CPUSet{}
}
func (m *fakeManager) GetAllocatableCPUs() cpuset.CPUSet {
klog.Infof("[fake cpumanager] Get Allocatable Cpus")
return cpuset.CPUSet{}
}
// NewFakeManager creates empty/fake cpu manager
func NewFakeManager() Manager {
return &fakeManager{

View File

@ -19,6 +19,7 @@ package cpumanager
import (
"k8s.io/api/core/v1"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state"
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
)
@ -38,4 +39,6 @@ type Policy interface {
// and is consulted to achieve NUMA aware resource alignment per Pod
// among this and other resource controllers.
GetPodTopologyHints(s state.State, pod *v1.Pod) map[string][]topologymanager.TopologyHint
// GetAllocatableCPUs returns the assignable (not allocated) CPUs
GetAllocatableCPUs(m state.State) cpuset.CPUSet
}

View File

@ -20,6 +20,7 @@ import (
"k8s.io/api/core/v1"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state"
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
)
@ -30,7 +31,7 @@ var _ Policy = &nonePolicy{}
// PolicyNone name of none policy
const PolicyNone policyName = "none"
// NewNonePolicy returns a cupset manager policy that does nothing
// NewNonePolicy returns a cpuset manager policy that does nothing
func NewNonePolicy() Policy {
return &nonePolicy{}
}
@ -59,3 +60,12 @@ func (p *nonePolicy) GetTopologyHints(s state.State, pod *v1.Pod, container *v1.
func (p *nonePolicy) GetPodTopologyHints(s state.State, pod *v1.Pod) map[string][]topologymanager.TopologyHint {
return nil
}
// Assignable CPUs are the ones that can be exclusively allocated to pods that meet the exclusivity requirement
// (ie guaranteed QoS class and integral CPU request).
// Assignability of CPUs as a concept is only applicable in case of static policy i.e. scenarios where workloads
// CAN get exclusive access to core(s).
// Hence, we return empty set here: no cpus are assignable according to above definition with this policy.
func (p *nonePolicy) GetAllocatableCPUs(m state.State) cpuset.CPUSet {
return cpuset.NewCPUSet()
}

View File

@ -65,3 +65,24 @@ func TestNonePolicyRemove(t *testing.T) {
t.Errorf("NonePolicy RemoveContainer() error. expected no error but got %v", err)
}
}
func TestNonePolicyGetAllocatableCPUs(t *testing.T) {
// any random topology is fine
var cpuIDs []int
for cpuID := range topoSingleSocketHT.CPUDetails {
cpuIDs = append(cpuIDs, cpuID)
}
policy := &nonePolicy{}
st := &mockState{
assignments: state.ContainerCPUAssignments{},
defaultCPUSet: cpuset.NewCPUSet(cpuIDs...),
}
cpus := policy.GetAllocatableCPUs(st)
if cpus.Size() != 0 {
t.Errorf("NonePolicy GetAllocatableCPUs() error. expected empty set, returned: %v", cpus)
}
}

View File

@ -187,8 +187,8 @@ func (p *staticPolicy) validateState(s state.State) error {
return nil
}
// assignableCPUs returns the set of unassigned CPUs minus the reserved set.
func (p *staticPolicy) assignableCPUs(s state.State) cpuset.CPUSet {
// GetAllocatableCPUs returns the set of unassigned CPUs minus the reserved set.
func (p *staticPolicy) GetAllocatableCPUs(s state.State) cpuset.CPUSet {
return s.GetDefaultCPUSet().Difference(p.reserved)
}
@ -258,14 +258,14 @@ func (p *staticPolicy) RemoveContainer(s state.State, podUID string, containerNa
func (p *staticPolicy) allocateCPUs(s state.State, numCPUs int, numaAffinity bitmask.BitMask, reusableCPUs cpuset.CPUSet) (cpuset.CPUSet, error) {
klog.Infof("[cpumanager] allocateCpus: (numCPUs: %d, socket: %v)", numCPUs, numaAffinity)
assignableCPUs := p.assignableCPUs(s).Union(reusableCPUs)
allocatableCPUs := p.GetAllocatableCPUs(s).Union(reusableCPUs)
// If there are aligned CPUs in numaAffinity, attempt to take those first.
result := cpuset.NewCPUSet()
if numaAffinity != nil {
alignedCPUs := cpuset.NewCPUSet()
for _, numaNodeID := range numaAffinity.GetBits() {
alignedCPUs = alignedCPUs.Union(assignableCPUs.Intersection(p.topology.CPUDetails.CPUsInNUMANodes(numaNodeID)))
alignedCPUs = alignedCPUs.Union(allocatableCPUs.Intersection(p.topology.CPUDetails.CPUsInNUMANodes(numaNodeID)))
}
numAlignedToAlloc := alignedCPUs.Size()
@ -282,7 +282,7 @@ func (p *staticPolicy) allocateCPUs(s state.State, numCPUs int, numaAffinity bit
}
// Get any remaining CPUs from what's leftover after attempting to grab aligned ones.
remainingCPUs, err := takeByTopology(p.topology, assignableCPUs.Difference(result), numCPUs-result.Size())
remainingCPUs, err := takeByTopology(p.topology, allocatableCPUs.Difference(result), numCPUs-result.Size())
if err != nil {
return cpuset.NewCPUSet(), err
}
@ -368,7 +368,7 @@ func (p *staticPolicy) GetTopologyHints(s state.State, pod *v1.Pod, container *v
}
// Get a list of available CPUs.
available := p.assignableCPUs(s)
available := p.GetAllocatableCPUs(s)
// Get a list of reusable CPUs (e.g. CPUs reused from initContainers).
// It should be an empty CPUSet for a newly created pod.
@ -423,7 +423,7 @@ func (p *staticPolicy) GetPodTopologyHints(s state.State, pod *v1.Pod) map[strin
}
// Get a list of available CPUs.
available := p.assignableCPUs(s)
available := p.GetAllocatableCPUs(s)
// Get a list of reusable CPUs (e.g. CPUs reused from initContainers).
// It should be an empty CPUSet for a newly created pod.

View File

@ -85,8 +85,8 @@ type ManagerImpl struct {
// e.g. a new device is advertised, two old devices are deleted and a running device fails.
callback monitorCallback
// allDevices is a map by resource name of all the devices currently registered to the device manager
allDevices map[string]map[string]pluginapi.Device
// allDevices holds all the devices currently registered to the device manager
allDevices ResourceDeviceInstances
// healthyDevices contains all of the registered healthy resourceNames and their exported device IDs.
healthyDevices map[string]sets.String
@ -1068,6 +1068,15 @@ func (m *ManagerImpl) isDevicePluginResource(resource string) bool {
return false
}
// GetAllocatableDevices returns information about all the devices known to the manager
func (m *ManagerImpl) GetAllocatableDevices() ResourceDeviceInstances {
m.mutex.Lock()
resp := m.allDevices.Clone()
m.mutex.Unlock()
klog.V(4).Infof("known devices: %d", len(resp))
return resp
}
// GetDevices returns the devices used by the specified container
func (m *ManagerImpl) GetDevices(podUID, containerName string) []*podresourcesapi.ContainerDevices {
return m.podDevices.getContainerDevices(podUID, containerName)

View File

@ -93,3 +93,8 @@ func (h *ManagerStub) ShouldResetExtendedResourceCapacity() bool {
func (h *ManagerStub) UpdateAllocatedDevices() {
return
}
// GetAllocatableDevices returns nothing
func (h *ManagerStub) GetAllocatableDevices() ResourceDeviceInstances {
return nil
}

View File

@ -346,3 +346,21 @@ func (pdev *podDevices) getContainerDevices(podUID, contName string) []*podresou
}
return cDev
}
// ResourceDeviceInstances is a map ping resource name -> device name -> device data
type ResourceDeviceInstances map[string]map[string]pluginapi.Device
func NewResourceDeviceInstances() ResourceDeviceInstances {
return make(ResourceDeviceInstances)
}
func (rdev ResourceDeviceInstances) Clone() ResourceDeviceInstances {
clone := NewResourceDeviceInstances()
for resourceName, resourceDevs := range rdev {
clone[resourceName] = make(map[string]pluginapi.Device)
for devID, dev := range resourceDevs {
clone[resourceName][devID] = dev
}
}
return clone
}

View File

@ -77,6 +77,9 @@ type Manager interface {
// UpdateAllocatedDevices frees any Devices that are bound to terminated pods.
UpdateAllocatedDevices()
// GetAllocatableDevices returns information about all the devices known to the manager
GetAllocatableDevices() ResourceDeviceInstances
}
// DeviceRunContainerOptions contains the combined container runtime settings to consume its allocated devices.

View File

@ -26,6 +26,7 @@ import (
podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager"
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
"k8s.io/kubernetes/pkg/kubelet/cm/devicemanager"
"k8s.io/kubernetes/pkg/kubelet/cm/memorymanager"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
"k8s.io/kubernetes/pkg/kubelet/config"
@ -202,3 +203,17 @@ func (cm *FakeContainerManager) GetCPUs(_, _ string) cpuset.CPUSet {
cm.CalledFunctions = append(cm.CalledFunctions, "GetCPUs")
return cpuset.CPUSet{}
}
func (cm *FakeContainerManager) GetAllocatableDevices() devicemanager.ResourceDeviceInstances {
cm.Lock()
defer cm.Unlock()
cm.CalledFunctions = append(cm.CalledFunctions, "GetAllocatableDevices")
return nil
}
func (cm *FakeContainerManager) GetAllocatableCPUs() cpuset.CPUSet {
cm.Lock()
defer cm.Unlock()
cm.CalledFunctions = append(cm.CalledFunctions, "GetAllocatableCPUs")
return cpuset.CPUSet{}
}