Merge pull request #95734 from fromanirh/podresources-concrete-resources-apis

podresources APIs: concrete resources apis: implement GetAllocatableResources
This commit is contained in:
Kubernetes Prow Robot 2021-03-09 14:29:04 -08:00 committed by GitHub
commit 770a9504ea
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
31 changed files with 1995 additions and 117 deletions

View File

@ -730,6 +730,12 @@ const (
// Allows jobs to be created in the suspended state.
SuspendJob featuregate.Feature = "SuspendJob"
// owner: @fromanirh
// alpha: v1.21
//
// Enable POD resources API to return allocatable resources
KubeletPodResourcesGetAllocatable featuregate.Feature = "KubeletPodResourcesGetAllocatable"
// owner: @jayunit100 @abhiraut @rikatz
// beta: v1.21
//
@ -845,6 +851,7 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS
IngressClassNamespacedParams: {Default: false, PreRelease: featuregate.Alpha},
ServiceInternalTrafficPolicy: {Default: false, PreRelease: featuregate.Alpha},
SuspendJob: {Default: false, PreRelease: featuregate.Alpha},
KubeletPodResourcesGetAllocatable: {Default: false, PreRelease: featuregate.Alpha},
NamespaceDefaultLabelName: {Default: true, PreRelease: featuregate.Beta}, // graduate to GA and lock to default in 1.22, remove in 1.24
// inherited features from generic apiserver, relisted here to get a conflict if it is changed

View File

@ -18,7 +18,10 @@ package podresources
import (
"context"
"fmt"
utilfeature "k8s.io/apiserver/pkg/util/feature"
kubefeatures "k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/kubelet/metrics"
"k8s.io/kubelet/pkg/apis/podresources/v1"
@ -44,6 +47,7 @@ func NewV1PodResourcesServer(podsProvider PodsProvider, devicesProvider DevicesP
// List returns information about the resources assigned to pods on the node
func (p *v1PodResourcesServer) List(ctx context.Context, req *v1.ListPodResourcesRequest) (*v1.ListPodResourcesResponse, error) {
metrics.PodResourcesEndpointRequestsTotalCount.WithLabelValues("v1").Inc()
metrics.PodResourcesEndpointRequestsListCount.WithLabelValues("v1").Inc()
pods := p.podsProvider.GetPods()
podResources := make([]*v1.PodResources, len(pods))
@ -70,3 +74,21 @@ func (p *v1PodResourcesServer) List(ctx context.Context, req *v1.ListPodResource
PodResources: podResources,
}, nil
}
// GetAllocatableResources returns information about all the resources known by the server - this more like the capacity, not like the current amount of free resources.
func (p *v1PodResourcesServer) GetAllocatableResources(ctx context.Context, req *v1.AllocatableResourcesRequest) (*v1.AllocatableResourcesResponse, error) {
metrics.PodResourcesEndpointRequestsTotalCount.WithLabelValues("v1").Inc()
metrics.PodResourcesEndpointRequestsGetAllocatableCount.WithLabelValues("v1").Inc()
if !utilfeature.DefaultFeatureGate.Enabled(kubefeatures.KubeletPodResourcesGetAllocatable) {
metrics.PodResourcesEndpointErrorsGetAllocatableCount.WithLabelValues("v1").Inc()
return nil, fmt.Errorf("Pod Resources API GetAllocatableResources disabled")
}
metrics.PodResourcesEndpointRequestsTotalCount.WithLabelValues("v1").Inc()
return &v1.AllocatableResourcesResponse{
Devices: p.devicesProvider.GetAllocatableDevices(),
CpuIds: p.cpusProvider.GetAllocatableCPUs(),
}, nil
}

View File

@ -18,12 +18,19 @@ package podresources
import (
"context"
"reflect"
"sort"
"testing"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
utilfeature "k8s.io/apiserver/pkg/util/feature"
featuregatetesting "k8s.io/component-base/featuregate/testing"
podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1"
pkgfeatures "k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
"k8s.io/kubernetes/pkg/kubelet/cm/devicemanager"
)
func TestListPodResourcesV1(t *testing.T) {
@ -135,14 +142,335 @@ func TestListPodResourcesV1(t *testing.T) {
m.On("GetDevices", string(podUID), containerName).Return(tc.devices)
m.On("GetCPUs", string(podUID), containerName).Return(tc.cpus)
m.On("UpdateAllocatedDevices").Return()
m.On("GetAllocatableCPUs").Return(cpuset.CPUSet{})
m.On("GetAllocatableDevices").Return(devicemanager.NewResourceDeviceInstances())
server := NewV1PodResourcesServer(m, m, m)
resp, err := server.List(context.TODO(), &podresourcesapi.ListPodResourcesRequest{})
if err != nil {
t.Errorf("want err = %v, got %q", nil, err)
}
if tc.expectedResponse.String() != resp.String() {
if !equalListResponse(tc.expectedResponse, resp) {
t.Errorf("want resp = %s, got %s", tc.expectedResponse.String(), resp.String())
}
})
}
}
func TestAllocatableResources(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, pkgfeatures.KubeletPodResourcesGetAllocatable, true)()
allDevs := []*podresourcesapi.ContainerDevices{
{
ResourceName: "resource",
DeviceIds: []string{"dev0"},
Topology: &podresourcesapi.TopologyInfo{
Nodes: []*podresourcesapi.NUMANode{
{
ID: 0,
},
},
},
},
{
ResourceName: "resource",
DeviceIds: []string{"dev1"},
Topology: &podresourcesapi.TopologyInfo{
Nodes: []*podresourcesapi.NUMANode{
{
ID: 1,
},
},
},
},
{
ResourceName: "resource-nt",
DeviceIds: []string{"devA"},
},
{
ResourceName: "resource-mm",
DeviceIds: []string{"devM0"},
Topology: &podresourcesapi.TopologyInfo{
Nodes: []*podresourcesapi.NUMANode{
{
ID: 0,
},
},
},
},
{
ResourceName: "resource-mm",
DeviceIds: []string{"devMM"},
Topology: &podresourcesapi.TopologyInfo{
Nodes: []*podresourcesapi.NUMANode{
{
ID: 0,
},
{
ID: 1,
},
},
},
},
}
allCPUs := []int64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}
for _, tc := range []struct {
desc string
allCPUs []int64
allDevices []*podresourcesapi.ContainerDevices
expectedAllocatableResourcesResponse *podresourcesapi.AllocatableResourcesResponse
}{
{
desc: "no devices, no CPUs",
allCPUs: []int64{},
allDevices: []*podresourcesapi.ContainerDevices{},
expectedAllocatableResourcesResponse: &podresourcesapi.AllocatableResourcesResponse{},
},
{
desc: "no devices, all CPUs",
allCPUs: allCPUs,
allDevices: []*podresourcesapi.ContainerDevices{},
expectedAllocatableResourcesResponse: &podresourcesapi.AllocatableResourcesResponse{
CpuIds: allCPUs,
},
},
{
desc: "with devices, all CPUs",
allCPUs: allCPUs,
allDevices: allDevs,
expectedAllocatableResourcesResponse: &podresourcesapi.AllocatableResourcesResponse{
CpuIds: allCPUs,
Devices: []*podresourcesapi.ContainerDevices{
{
ResourceName: "resource",
DeviceIds: []string{"dev0"},
Topology: &podresourcesapi.TopologyInfo{
Nodes: []*podresourcesapi.NUMANode{
{
ID: 0,
},
},
},
},
{
ResourceName: "resource",
DeviceIds: []string{"dev1"},
Topology: &podresourcesapi.TopologyInfo{
Nodes: []*podresourcesapi.NUMANode{
{
ID: 1,
},
},
},
},
{
ResourceName: "resource-nt",
DeviceIds: []string{"devA"},
},
{
ResourceName: "resource-mm",
DeviceIds: []string{"devM0"},
Topology: &podresourcesapi.TopologyInfo{
Nodes: []*podresourcesapi.NUMANode{
{
ID: 0,
},
},
},
},
{
ResourceName: "resource-mm",
DeviceIds: []string{"devMM"},
Topology: &podresourcesapi.TopologyInfo{
Nodes: []*podresourcesapi.NUMANode{
{
ID: 0,
},
{
ID: 1,
},
},
},
},
},
},
},
{
desc: "with devices, no CPUs",
allCPUs: []int64{},
allDevices: allDevs,
expectedAllocatableResourcesResponse: &podresourcesapi.AllocatableResourcesResponse{
Devices: []*podresourcesapi.ContainerDevices{
{
ResourceName: "resource",
DeviceIds: []string{"dev0"},
Topology: &podresourcesapi.TopologyInfo{
Nodes: []*podresourcesapi.NUMANode{
{
ID: 0,
},
},
},
},
{
ResourceName: "resource",
DeviceIds: []string{"dev1"},
Topology: &podresourcesapi.TopologyInfo{
Nodes: []*podresourcesapi.NUMANode{
{
ID: 1,
},
},
},
},
{
ResourceName: "resource-nt",
DeviceIds: []string{"devA"},
},
{
ResourceName: "resource-mm",
DeviceIds: []string{"devM0"},
Topology: &podresourcesapi.TopologyInfo{
Nodes: []*podresourcesapi.NUMANode{
{
ID: 0,
},
},
},
},
{
ResourceName: "resource-mm",
DeviceIds: []string{"devMM"},
Topology: &podresourcesapi.TopologyInfo{
Nodes: []*podresourcesapi.NUMANode{
{
ID: 0,
},
{
ID: 1,
},
},
},
},
},
},
},
} {
t.Run(tc.desc, func(t *testing.T) {
m := new(mockProvider)
m.On("GetDevices", "", "").Return([]*podresourcesapi.ContainerDevices{})
m.On("GetCPUs", "", "").Return([]int64{})
m.On("UpdateAllocatedDevices").Return()
m.On("GetAllocatableDevices").Return(tc.allDevices)
m.On("GetAllocatableCPUs").Return(tc.allCPUs)
server := NewV1PodResourcesServer(m, m, m)
resp, err := server.GetAllocatableResources(context.TODO(), &podresourcesapi.AllocatableResourcesRequest{})
if err != nil {
t.Errorf("want err = %v, got %q", nil, err)
}
if !equalAllocatableResourcesResponse(tc.expectedAllocatableResourcesResponse, resp) {
t.Errorf("want resp = %s, got %s", tc.expectedAllocatableResourcesResponse.String(), resp.String())
}
})
}
}
func equalListResponse(respA, respB *podresourcesapi.ListPodResourcesResponse) bool {
if len(respA.PodResources) != len(respB.PodResources) {
return false
}
for idx := 0; idx < len(respA.PodResources); idx++ {
podResA := respA.PodResources[idx]
podResB := respB.PodResources[idx]
if podResA.Name != podResB.Name {
return false
}
if podResA.Namespace != podResB.Namespace {
return false
}
if len(podResA.Containers) != len(podResB.Containers) {
return false
}
for jdx := 0; jdx < len(podResA.Containers); jdx++ {
cntA := podResA.Containers[jdx]
cntB := podResB.Containers[jdx]
if cntA.Name != cntB.Name {
return false
}
if !equalInt64s(cntA.CpuIds, cntB.CpuIds) {
return false
}
if !equalContainerDevices(cntA.Devices, cntB.Devices) {
return false
}
}
}
return true
}
func equalContainerDevices(devA, devB []*podresourcesapi.ContainerDevices) bool {
if len(devA) != len(devB) {
return false
}
for idx := 0; idx < len(devA); idx++ {
cntDevA := devA[idx]
cntDevB := devB[idx]
if cntDevA.ResourceName != cntDevB.ResourceName {
return false
}
if !equalTopology(cntDevA.Topology, cntDevB.Topology) {
return false
}
if !equalStrings(cntDevA.DeviceIds, cntDevB.DeviceIds) {
return false
}
}
return true
}
func equalInt64s(a, b []int64) bool {
if len(a) != len(b) {
return false
}
aCopy := append([]int64{}, a...)
sort.Slice(aCopy, func(i, j int) bool { return aCopy[i] < aCopy[j] })
bCopy := append([]int64{}, b...)
sort.Slice(bCopy, func(i, j int) bool { return bCopy[i] < bCopy[j] })
return reflect.DeepEqual(aCopy, bCopy)
}
func equalStrings(a, b []string) bool {
if len(a) != len(b) {
return false
}
aCopy := append([]string{}, a...)
sort.Strings(aCopy)
bCopy := append([]string{}, b...)
sort.Strings(bCopy)
return reflect.DeepEqual(aCopy, bCopy)
}
func equalTopology(a, b *podresourcesapi.TopologyInfo) bool {
if a == nil && b != nil {
return false
}
if a != nil && b == nil {
return false
}
return reflect.DeepEqual(a, b)
}
func equalAllocatableResourcesResponse(respA, respB *podresourcesapi.AllocatableResourcesResponse) bool {
if !equalInt64s(respA.CpuIds, respB.CpuIds) {
return false
}
return equalContainerDevices(respA.Devices, respB.Devices)
}

View File

@ -52,6 +52,16 @@ func (m *mockProvider) UpdateAllocatedDevices() {
m.Called()
}
func (m *mockProvider) GetAllocatableDevices() []*podresourcesv1.ContainerDevices {
args := m.Called()
return args.Get(0).([]*podresourcesv1.ContainerDevices)
}
func (m *mockProvider) GetAllocatableCPUs() []int64 {
args := m.Called()
return args.Get(0).([]int64)
}
func TestListPodResourcesV1alpha1(t *testing.T) {
podName := "pod-name"
podNamespace := "pod-namespace"

View File

@ -23,8 +23,12 @@ import (
// DevicesProvider knows how to provide the devices used by the given container
type DevicesProvider interface {
GetDevices(podUID, containerName string) []*podresourcesapi.ContainerDevices
// UpdateAllocatedDevices frees any Devices that are bound to terminated pods.
UpdateAllocatedDevices()
// GetDevices returns information about the devices assigned to pods and containers
GetDevices(podUID, containerName string) []*podresourcesapi.ContainerDevices
// GetAllocatableDevices returns information about all the devices known to the manager
GetAllocatableDevices() []*podresourcesapi.ContainerDevices
}
// PodsProvider knows how to provide the pods admitted by the node
@ -34,5 +38,8 @@ type PodsProvider interface {
// CPUsProvider knows how to provide the cpus used by the given container
type CPUsProvider interface {
// GetCPUs returns information about the cpus assigned to pods and containers
GetCPUs(podUID, containerName string) []int64
// GetAllocatableCPUs returns the allocatable (not allocated) CPUs
GetAllocatableCPUs() []int64
}

View File

@ -28,7 +28,9 @@ import (
internalapi "k8s.io/cri-api/pkg/apis"
podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1"
kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config"
"k8s.io/kubernetes/pkg/kubelet/apis/podresources"
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
"k8s.io/kubernetes/pkg/kubelet/cm/devicemanager"
"k8s.io/kubernetes/pkg/kubelet/config"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
evictionapi "k8s.io/kubernetes/pkg/kubelet/eviction/api"
@ -103,12 +105,6 @@ type ContainerManager interface {
// registration.
GetPluginRegistrationHandler() cache.PluginHandler
// GetDevices returns information about the devices assigned to pods and containers
GetDevices(podUID, containerName string) []*podresourcesapi.ContainerDevices
// GetCPUs returns information about the cpus assigned to pods and containers
GetCPUs(podUID, containerName string) []int64
// ShouldResetExtendedResourceCapacity returns whether or not the extended resources should be zeroed,
// due to node recreation.
ShouldResetExtendedResourceCapacity() bool
@ -116,8 +112,9 @@ type ContainerManager interface {
// GetAllocateResourcesPodAdmitHandler returns an instance of a PodAdmitHandler responsible for allocating pod resources.
GetAllocateResourcesPodAdmitHandler() lifecycle.PodAdmitHandler
// UpdateAllocatedDevices frees any Devices that are bound to terminated pods.
UpdateAllocatedDevices()
// Implements the podresources Provider API for CPUs and Devices
podresources.CPUsProvider
podresources.DevicesProvider
}
type NodeConfig struct {
@ -191,3 +188,39 @@ func ParseQOSReserved(m map[string]string) (*map[v1.ResourceName]int64, error) {
}
return &reservations, nil
}
func containerDevicesFromResourceDeviceInstances(devs devicemanager.ResourceDeviceInstances) []*podresourcesapi.ContainerDevices {
var respDevs []*podresourcesapi.ContainerDevices
for resourceName, resourceDevs := range devs {
for devID, dev := range resourceDevs {
topo := dev.GetTopology()
if topo == nil {
// Some device plugin do not report the topology information.
// This is legal, so we report the devices anyway,
// let the client decide what to do.
respDevs = append(respDevs, &podresourcesapi.ContainerDevices{
ResourceName: resourceName,
DeviceIds: []string{devID},
})
continue
}
for _, node := range topo.GetNodes() {
respDevs = append(respDevs, &podresourcesapi.ContainerDevices{
ResourceName: resourceName,
DeviceIds: []string{devID},
Topology: &podresourcesapi.TopologyInfo{
Nodes: []*podresourcesapi.NUMANode{
{
ID: node.GetID(),
},
},
},
})
}
}
}
return respDevs
}

View File

@ -1070,13 +1070,21 @@ func (cm *containerManagerImpl) GetDevicePluginResourceCapacity() (v1.ResourceLi
}
func (cm *containerManagerImpl) GetDevices(podUID, containerName string) []*podresourcesapi.ContainerDevices {
return cm.deviceManager.GetDevices(podUID, containerName)
return containerDevicesFromResourceDeviceInstances(cm.deviceManager.GetDevices(podUID, containerName))
}
func (cm *containerManagerImpl) GetAllocatableDevices() []*podresourcesapi.ContainerDevices {
return containerDevicesFromResourceDeviceInstances(cm.deviceManager.GetAllocatableDevices())
}
func (cm *containerManagerImpl) GetCPUs(podUID, containerName string) []int64 {
return cm.cpuManager.GetCPUs(podUID, containerName).ToSliceNoSortInt64()
}
func (cm *containerManagerImpl) GetAllocatableCPUs() []int64 {
return cm.cpuManager.GetAllocatableCPUs().ToSliceNoSortInt64()
}
func (cm *containerManagerImpl) ShouldResetExtendedResourceCapacity() bool {
return cm.deviceManager.ShouldResetExtendedResourceCapacity()
}

View File

@ -114,6 +114,10 @@ func (cm *containerManagerStub) GetDevices(_, _ string) []*podresourcesapi.Conta
return nil
}
func (cm *containerManagerStub) GetAllocatableDevices() []*podresourcesapi.ContainerDevices {
return nil
}
func (cm *containerManagerStub) ShouldResetExtendedResourceCapacity() bool {
return cm.shouldResetExtendedResourceCapacity
}
@ -130,6 +134,10 @@ func (cm *containerManagerStub) GetCPUs(_, _ string) []int64 {
return nil
}
func (cm *containerManagerStub) GetAllocatableCPUs() []int64 {
return nil
}
func NewStubContainerManager() ContainerManager {
return &containerManagerStub{shouldResetExtendedResourceCapacity: false}
}

View File

@ -217,7 +217,11 @@ func (cm *containerManagerImpl) GetPodCgroupRoot() string {
}
func (cm *containerManagerImpl) GetDevices(podUID, containerName string) []*podresourcesapi.ContainerDevices {
return cm.deviceManager.GetDevices(podUID, containerName)
return containerDevicesFromResourceDeviceInstances(cm.deviceManager.GetDevices(podUID, containerName))
}
func (cm *containerManagerImpl) GetAllocatableDevices() []*podresourcesapi.ContainerDevices {
return nil
}
func (cm *containerManagerImpl) ShouldResetExtendedResourceCapacity() bool {
@ -235,3 +239,7 @@ func (cm *containerManagerImpl) UpdateAllocatedDevices() {
func (cm *containerManagerImpl) GetCPUs(_, _ string) []int64 {
return nil
}
func (cm *containerManagerImpl) GetAllocatableCPUs() []int64 {
return nil
}

View File

@ -85,6 +85,9 @@ type Manager interface {
// and is consulted to achieve NUMA aware resource alignment per Pod
// among this and other resource controllers.
GetPodTopologyHints(pod *v1.Pod) map[string][]topologymanager.TopologyHint
// GetAllocatableCPUs returns the assignable (not allocated) CPUs
GetAllocatableCPUs() cpuset.CPUSet
}
type manager struct {
@ -124,6 +127,9 @@ type manager struct {
// stateFileDirectory holds the directory where the state file for checkpoints is held.
stateFileDirectory string
// allocatableCPUs is the set of online CPUs as reported by the system
allocatableCPUs cpuset.CPUSet
}
var _ Manager = &manager{}
@ -150,6 +156,7 @@ func NewManager(cpuPolicyName string, reconcilePeriod time.Duration, machineInfo
return nil, err
}
klog.Infof("[cpumanager] detected CPU topology: %v", topo)
reservedCPUs, ok := nodeAllocatableReservation[v1.ResourceCPU]
if !ok {
// The static policy cannot initialize without this information.
@ -210,6 +217,8 @@ func (m *manager) Start(activePods ActivePodsFunc, sourcesReady config.SourcesRe
return err
}
m.allocatableCPUs = m.policy.GetAllocatableCPUs(m.state)
if m.policy.Name() == string(PolicyNone) {
return nil
}
@ -296,6 +305,10 @@ func (m *manager) GetPodTopologyHints(pod *v1.Pod) map[string][]topologymanager.
return m.policy.GetPodTopologyHints(m.state, pod)
}
func (m *manager) GetAllocatableCPUs() cpuset.CPUSet {
return m.allocatableCPUs.Clone()
}
type reconciledContainer struct {
podName string
containerName string

View File

@ -120,6 +120,10 @@ func (p *mockPolicy) GetPodTopologyHints(s state.State, pod *v1.Pod) map[string]
return nil
}
func (p *mockPolicy) GetAllocatableCPUs(m state.State) cpuset.CPUSet {
return cpuset.NewCPUSet()
}
type mockRuntimeService struct {
err error
}

View File

@ -74,6 +74,11 @@ func (m *fakeManager) GetCPUs(podUID, containerName string) cpuset.CPUSet {
return cpuset.CPUSet{}
}
func (m *fakeManager) GetAllocatableCPUs() cpuset.CPUSet {
klog.Infof("[fake cpumanager] Get Allocatable Cpus")
return cpuset.CPUSet{}
}
// NewFakeManager creates empty/fake cpu manager
func NewFakeManager() Manager {
return &fakeManager{

View File

@ -19,6 +19,7 @@ package cpumanager
import (
"k8s.io/api/core/v1"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state"
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
)
@ -38,4 +39,6 @@ type Policy interface {
// and is consulted to achieve NUMA aware resource alignment per Pod
// among this and other resource controllers.
GetPodTopologyHints(s state.State, pod *v1.Pod) map[string][]topologymanager.TopologyHint
// GetAllocatableCPUs returns the assignable (not allocated) CPUs
GetAllocatableCPUs(m state.State) cpuset.CPUSet
}

View File

@ -20,6 +20,7 @@ import (
"k8s.io/api/core/v1"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state"
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
)
@ -30,7 +31,7 @@ var _ Policy = &nonePolicy{}
// PolicyNone name of none policy
const PolicyNone policyName = "none"
// NewNonePolicy returns a cupset manager policy that does nothing
// NewNonePolicy returns a cpuset manager policy that does nothing
func NewNonePolicy() Policy {
return &nonePolicy{}
}
@ -59,3 +60,12 @@ func (p *nonePolicy) GetTopologyHints(s state.State, pod *v1.Pod, container *v1.
func (p *nonePolicy) GetPodTopologyHints(s state.State, pod *v1.Pod) map[string][]topologymanager.TopologyHint {
return nil
}
// Assignable CPUs are the ones that can be exclusively allocated to pods that meet the exclusivity requirement
// (ie guaranteed QoS class and integral CPU request).
// Assignability of CPUs as a concept is only applicable in case of static policy i.e. scenarios where workloads
// CAN get exclusive access to core(s).
// Hence, we return empty set here: no cpus are assignable according to above definition with this policy.
func (p *nonePolicy) GetAllocatableCPUs(m state.State) cpuset.CPUSet {
return cpuset.NewCPUSet()
}

View File

@ -65,3 +65,24 @@ func TestNonePolicyRemove(t *testing.T) {
t.Errorf("NonePolicy RemoveContainer() error. expected no error but got %v", err)
}
}
func TestNonePolicyGetAllocatableCPUs(t *testing.T) {
// any random topology is fine
var cpuIDs []int
for cpuID := range topoSingleSocketHT.CPUDetails {
cpuIDs = append(cpuIDs, cpuID)
}
policy := &nonePolicy{}
st := &mockState{
assignments: state.ContainerCPUAssignments{},
defaultCPUSet: cpuset.NewCPUSet(cpuIDs...),
}
cpus := policy.GetAllocatableCPUs(st)
if cpus.Size() != 0 {
t.Errorf("NonePolicy GetAllocatableCPUs() error. expected empty set, returned: %v", cpus)
}
}

View File

@ -187,8 +187,8 @@ func (p *staticPolicy) validateState(s state.State) error {
return nil
}
// assignableCPUs returns the set of unassigned CPUs minus the reserved set.
func (p *staticPolicy) assignableCPUs(s state.State) cpuset.CPUSet {
// GetAllocatableCPUs returns the set of unassigned CPUs minus the reserved set.
func (p *staticPolicy) GetAllocatableCPUs(s state.State) cpuset.CPUSet {
return s.GetDefaultCPUSet().Difference(p.reserved)
}
@ -258,14 +258,14 @@ func (p *staticPolicy) RemoveContainer(s state.State, podUID string, containerNa
func (p *staticPolicy) allocateCPUs(s state.State, numCPUs int, numaAffinity bitmask.BitMask, reusableCPUs cpuset.CPUSet) (cpuset.CPUSet, error) {
klog.Infof("[cpumanager] allocateCpus: (numCPUs: %d, socket: %v)", numCPUs, numaAffinity)
assignableCPUs := p.assignableCPUs(s).Union(reusableCPUs)
allocatableCPUs := p.GetAllocatableCPUs(s).Union(reusableCPUs)
// If there are aligned CPUs in numaAffinity, attempt to take those first.
result := cpuset.NewCPUSet()
if numaAffinity != nil {
alignedCPUs := cpuset.NewCPUSet()
for _, numaNodeID := range numaAffinity.GetBits() {
alignedCPUs = alignedCPUs.Union(assignableCPUs.Intersection(p.topology.CPUDetails.CPUsInNUMANodes(numaNodeID)))
alignedCPUs = alignedCPUs.Union(allocatableCPUs.Intersection(p.topology.CPUDetails.CPUsInNUMANodes(numaNodeID)))
}
numAlignedToAlloc := alignedCPUs.Size()
@ -282,7 +282,7 @@ func (p *staticPolicy) allocateCPUs(s state.State, numCPUs int, numaAffinity bit
}
// Get any remaining CPUs from what's leftover after attempting to grab aligned ones.
remainingCPUs, err := takeByTopology(p.topology, assignableCPUs.Difference(result), numCPUs-result.Size())
remainingCPUs, err := takeByTopology(p.topology, allocatableCPUs.Difference(result), numCPUs-result.Size())
if err != nil {
return cpuset.NewCPUSet(), err
}
@ -368,7 +368,7 @@ func (p *staticPolicy) GetTopologyHints(s state.State, pod *v1.Pod, container *v
}
// Get a list of available CPUs.
available := p.assignableCPUs(s)
available := p.GetAllocatableCPUs(s)
// Get a list of reusable CPUs (e.g. CPUs reused from initContainers).
// It should be an empty CPUSet for a newly created pod.
@ -423,7 +423,7 @@ func (p *staticPolicy) GetPodTopologyHints(s state.State, pod *v1.Pod) map[strin
}
// Get a list of available CPUs.
available := p.assignableCPUs(s)
available := p.GetAllocatableCPUs(s)
// Get a list of reusable CPUs (e.g. CPUs reused from initContainers).
// It should be an empty CPUSet for a newly created pod.

View File

@ -75,6 +75,15 @@ func NewCPUSet(cpus ...int) CPUSet {
return b.Result()
}
// NewCPUSet returns a new CPUSet containing the supplied elements, as slice of int64.
func NewCPUSetInt64(cpus ...int64) CPUSet {
b := NewBuilder()
for _, c := range cpus {
b.Add(int(c))
}
return b.Result()
}
// Size returns the number of elements in this set.
func (s CPUSet) Size() int {
return len(s.elems)

View File

@ -37,7 +37,6 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
utilfeature "k8s.io/apiserver/pkg/util/feature"
pluginapi "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1"
podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1"
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
@ -85,8 +84,8 @@ type ManagerImpl struct {
// e.g. a new device is advertised, two old devices are deleted and a running device fails.
callback monitorCallback
// allDevices is a map by resource name of all the devices currently registered to the device manager
allDevices map[string]map[string]pluginapi.Device
// allDevices holds all the devices currently registered to the device manager
allDevices ResourceDeviceInstances
// healthyDevices contains all of the registered healthy resourceNames and their exported device IDs.
healthyDevices map[string]sets.String
@ -152,7 +151,7 @@ func newManagerImpl(socketPath string, topology []cadvisorapi.Node, topologyAffi
socketname: file,
socketdir: dir,
allDevices: make(map[string]map[string]pluginapi.Device),
allDevices: NewResourceDeviceInstances(),
healthyDevices: make(map[string]sets.String),
unhealthyDevices: make(map[string]sets.String),
allocatedDevices: make(map[string]sets.String),
@ -1068,8 +1067,17 @@ func (m *ManagerImpl) isDevicePluginResource(resource string) bool {
return false
}
// GetAllocatableDevices returns information about all the devices known to the manager
func (m *ManagerImpl) GetAllocatableDevices() ResourceDeviceInstances {
m.mutex.Lock()
resp := m.allDevices.Clone()
m.mutex.Unlock()
klog.V(4).Infof("known devices: %d", len(resp))
return resp
}
// GetDevices returns the devices used by the specified container
func (m *ManagerImpl) GetDevices(podUID, containerName string) []*podresourcesapi.ContainerDevices {
func (m *ManagerImpl) GetDevices(podUID, containerName string) ResourceDeviceInstances {
return m.podDevices.getContainerDevices(podUID, containerName)
}

View File

@ -18,7 +18,6 @@ package devicemanager
import (
v1 "k8s.io/api/core/v1"
podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
"k8s.io/kubernetes/pkg/kubelet/config"
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
@ -80,7 +79,12 @@ func (h *ManagerStub) GetPodTopologyHints(pod *v1.Pod) map[string][]topologymana
}
// GetDevices returns nil
func (h *ManagerStub) GetDevices(_, _ string) []*podresourcesapi.ContainerDevices {
func (h *ManagerStub) GetDevices(_, _ string) ResourceDeviceInstances {
return nil
}
// GetAllocatableDevices returns nothing
func (h *ManagerStub) GetAllocatableDevices() ResourceDeviceInstances {
return nil
}

View File

@ -622,7 +622,7 @@ func getTestManager(tmpDir string, activePods ActivePodsFunc, testRes []TestReso
activePods: activePods,
sourcesReady: &sourcesReadyStub{},
checkpointManager: ckm,
allDevices: make(map[string]map[string]pluginapi.Device),
allDevices: NewResourceDeviceInstances(),
}
for _, res := range testRes {

View File

@ -23,7 +23,6 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
pluginapi "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1"
podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1"
"k8s.io/kubernetes/pkg/kubelet/cm/devicemanager/checkpoint"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
)
@ -324,7 +323,7 @@ func (pdev *podDevices) deviceRunContainerOptions(podUID, contName string) *Devi
}
// getContainerDevices returns the devices assigned to the provided container for all ResourceNames
func (pdev *podDevices) getContainerDevices(podUID, contName string) []*podresourcesapi.ContainerDevices {
func (pdev *podDevices) getContainerDevices(podUID, contName string) ResourceDeviceInstances {
pdev.RLock()
defer pdev.RUnlock()
@ -334,15 +333,51 @@ func (pdev *podDevices) getContainerDevices(podUID, contName string) []*podresou
if _, contExists := pdev.devs[podUID][contName]; !contExists {
return nil
}
cDev := []*podresourcesapi.ContainerDevices{}
resDev := NewResourceDeviceInstances()
for resource, allocateInfo := range pdev.devs[podUID][contName] {
if len(allocateInfo.deviceIds) == 0 {
continue
}
devicePluginMap := make(map[string]pluginapi.Device)
for numaid, devlist := range allocateInfo.deviceIds {
cDev = append(cDev, &podresourcesapi.ContainerDevices{
ResourceName: resource,
DeviceIds: devlist,
Topology: &podresourcesapi.TopologyInfo{Nodes: []*podresourcesapi.NUMANode{{ID: numaid}}},
})
for _, devId := range devlist {
NUMANodes := []*pluginapi.NUMANode{{ID: numaid}}
if pDev, ok := devicePluginMap[devId]; ok && pDev.Topology != nil {
if nodes := pDev.Topology.GetNodes(); nodes != nil {
NUMANodes = append(NUMANodes, nodes...)
}
}
devicePluginMap[devId] = pluginapi.Device{
// ID and Healthy are not relevant here.
Topology: &pluginapi.TopologyInfo{
Nodes: NUMANodes,
},
}
}
}
resDev[resource] = devicePluginMap
}
return resDev
}
// DeviceInstances is a mapping device name -> plugin device data
type DeviceInstances map[string]pluginapi.Device
// ResourceDeviceInstances is a mapping resource name -> DeviceInstances
type ResourceDeviceInstances map[string]DeviceInstances
func NewResourceDeviceInstances() ResourceDeviceInstances {
return make(ResourceDeviceInstances)
}
func (rdev ResourceDeviceInstances) Clone() ResourceDeviceInstances {
clone := NewResourceDeviceInstances()
for resourceName, resourceDevs := range rdev {
clone[resourceName] = make(map[string]pluginapi.Device)
for devID, dev := range resourceDevs {
clone[resourceName][devID] = dev
}
}
return cDev
return clone
}

View File

@ -35,13 +35,18 @@ func TestGetContainerDevices(t *testing.T) {
devices,
constructAllocResp(map[string]string{"/dev/r1dev1": "/dev/r1dev1", "/dev/r1dev2": "/dev/r1dev2"}, map[string]string{"/home/r1lib1": "/usr/r1lib1"}, map[string]string{}))
contDevices := podDevices.getContainerDevices(podID, contID)
require.Equal(t, len(devices), len(contDevices), "Incorrect container devices")
for _, contDev := range contDevices {
for _, node := range contDev.Topology.Nodes {
resContDevices := podDevices.getContainerDevices(podID, contID)
contDevices, ok := resContDevices[resourceName1]
require.True(t, ok, "resource %q not present", resourceName1)
for devId, plugInfo := range contDevices {
nodes := plugInfo.GetTopology().GetNodes()
require.Equal(t, len(nodes), len(devices), "Incorrect container devices: %v - %v (nodes %v)", devices, contDevices, nodes)
for _, node := range plugInfo.GetTopology().GetNodes() {
dev, ok := devices[node.ID]
require.True(t, ok, "NUMA id %v doesn't exist in result", node.ID)
require.Equal(t, contDev.DeviceIds[0], dev[0], "Can't find device %s in result", dev[0])
require.Equal(t, devId, dev[0], "Can't find device %s in result", dev[0])
}
}
}

View File

@ -56,7 +56,7 @@ func TestGetTopologyHints(t *testing.T) {
for _, tc := range tcases {
m := ManagerImpl{
allDevices: make(map[string]map[string]pluginapi.Device),
allDevices: NewResourceDeviceInstances(),
healthyDevices: make(map[string]sets.String),
allocatedDevices: make(map[string]sets.String),
podDevices: newPodDevices(),
@ -66,7 +66,7 @@ func TestGetTopologyHints(t *testing.T) {
}
for r := range tc.devices {
m.allDevices[r] = make(map[string]pluginapi.Device)
m.allDevices[r] = make(DeviceInstances)
m.healthyDevices[r] = sets.NewString()
for _, d := range tc.devices[r] {
@ -409,7 +409,7 @@ func TestTopologyAlignedAllocation(t *testing.T) {
}
for _, tc := range tcases {
m := ManagerImpl{
allDevices: make(map[string]map[string]pluginapi.Device),
allDevices: NewResourceDeviceInstances(),
healthyDevices: make(map[string]sets.String),
allocatedDevices: make(map[string]sets.String),
endpoints: make(map[string]endpointInfo),
@ -419,7 +419,7 @@ func TestTopologyAlignedAllocation(t *testing.T) {
topologyAffinityStore: &mockAffinityStore{tc.hint},
}
m.allDevices[tc.resource] = make(map[string]pluginapi.Device)
m.allDevices[tc.resource] = make(DeviceInstances)
m.healthyDevices[tc.resource] = sets.NewString()
m.endpoints[tc.resource] = endpointInfo{}
@ -598,7 +598,7 @@ func TestGetPreferredAllocationParameters(t *testing.T) {
}
for _, tc := range tcases {
m := ManagerImpl{
allDevices: make(map[string]map[string]pluginapi.Device),
allDevices: NewResourceDeviceInstances(),
healthyDevices: make(map[string]sets.String),
allocatedDevices: make(map[string]sets.String),
endpoints: make(map[string]endpointInfo),
@ -608,7 +608,7 @@ func TestGetPreferredAllocationParameters(t *testing.T) {
topologyAffinityStore: &mockAffinityStore{tc.hint},
}
m.allDevices[tc.resource] = make(map[string]pluginapi.Device)
m.allDevices[tc.resource] = make(DeviceInstances)
m.healthyDevices[tc.resource] = sets.NewString()
for _, d := range tc.allDevices {
m.allDevices[tc.resource][d.ID] = d
@ -920,7 +920,7 @@ func TestGetPodTopologyHints(t *testing.T) {
for _, tc := range tcases {
m := ManagerImpl{
allDevices: make(map[string]map[string]pluginapi.Device),
allDevices: NewResourceDeviceInstances(),
healthyDevices: make(map[string]sets.String),
allocatedDevices: make(map[string]sets.String),
podDevices: newPodDevices(),
@ -930,7 +930,7 @@ func TestGetPodTopologyHints(t *testing.T) {
}
for r := range tc.devices {
m.allDevices[r] = make(map[string]pluginapi.Device)
m.allDevices[r] = make(DeviceInstances)
m.healthyDevices[r] = sets.NewString()
for _, d := range tc.devices[r] {

View File

@ -20,7 +20,6 @@ import (
"time"
v1 "k8s.io/api/core/v1"
podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
"k8s.io/kubernetes/pkg/kubelet/config"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
@ -60,7 +59,10 @@ type Manager interface {
GetWatcherHandler() cache.PluginHandler
// GetDevices returns information about the devices assigned to pods and containers
GetDevices(podUID, containerName string) []*podresourcesapi.ContainerDevices
GetDevices(podUID, containerName string) ResourceDeviceInstances
// GetAllocatableDevices returns information about all the devices known to the manager
GetAllocatableDevices() ResourceDeviceInstances
// ShouldResetExtendedResourceCapacity returns whether the extended resources should be reset or not,
// depending on the checkpoint file availability. Absence of the checkpoint file strongly indicates

View File

@ -174,6 +174,13 @@ func (cm *FakeContainerManager) GetDevices(_, _ string) []*podresourcesapi.Conta
return nil
}
func (cm *FakeContainerManager) GetAllocatableDevices() []*podresourcesapi.ContainerDevices {
cm.Lock()
defer cm.Unlock()
cm.CalledFunctions = append(cm.CalledFunctions, "GetAllocatableDevices")
return nil
}
func (cm *FakeContainerManager) ShouldResetExtendedResourceCapacity() bool {
cm.Lock()
defer cm.Unlock()
@ -201,3 +208,9 @@ func (cm *FakeContainerManager) GetCPUs(_, _ string) []int64 {
cm.CalledFunctions = append(cm.CalledFunctions, "GetCPUs")
return nil
}
func (cm *FakeContainerManager) GetAllocatableCPUs() []int64 {
cm.Lock()
defer cm.Unlock()
return nil
}

View File

@ -63,7 +63,11 @@ const (
DevicePluginRegistrationCountKey = "device_plugin_registration_total"
DevicePluginAllocationDurationKey = "device_plugin_alloc_duration_seconds"
// Metrics keys of pod resources operations
PodResourcesEndpointRequestsTotalKey = "pod_resources_endpoint_requests_total"
PodResourcesEndpointRequestsTotalKey = "pod_resources_endpoint_requests_total"
PodResourcesEndpointRequestsListKey = "pod_resources_endpoint_requests_list"
PodResourcesEndpointRequestsGetAllocatableKey = "pod_resources_endpoint_requests_get_allocatable"
PodResourcesEndpointErrorsListKey = "pod_resources_endpoint_errors_list"
PodResourcesEndpointErrorsGetAllocatableKey = "pod_resources_endpoint_errors_get_allocatable"
// Metric keys for node config
AssignedConfigKey = "node_config_assigned"
@ -293,6 +297,54 @@ var (
[]string{"server_api_version"},
)
// PodResourcesEndpointRequestsListCount is a Counter that tracks the number of requests to the PodResource List() endpoint.
// Broken down by server API version.
PodResourcesEndpointRequestsListCount = metrics.NewCounterVec(
&metrics.CounterOpts{
Subsystem: KubeletSubsystem,
Name: PodResourcesEndpointRequestsListKey,
Help: "Number of requests to the PodResource List endpoint. Broken down by server api version.",
StabilityLevel: metrics.ALPHA,
},
[]string{"server_api_version"},
)
// PodResourcesEndpointRequestsGetAllocatableCount is a Counter that tracks the number of requests to the PodResource GetAllocatableResources() endpoint.
// Broken down by server API version.
PodResourcesEndpointRequestsGetAllocatableCount = metrics.NewCounterVec(
&metrics.CounterOpts{
Subsystem: KubeletSubsystem,
Name: PodResourcesEndpointRequestsGetAllocatableKey,
Help: "Number of requests to the PodResource GetAllocatableResources endpoint. Broken down by server api version.",
StabilityLevel: metrics.ALPHA,
},
[]string{"server_api_version"},
)
// PodResourcesEndpointErrorsListCount is a Counter that tracks the number of errors returned by he PodResource List() endpoint.
// Broken down by server API version.
PodResourcesEndpointErrorsListCount = metrics.NewCounterVec(
&metrics.CounterOpts{
Subsystem: KubeletSubsystem,
Name: PodResourcesEndpointErrorsListKey,
Help: "Number of requests to the PodResource List endpoint which returned error. Broken down by server api version.",
StabilityLevel: metrics.ALPHA,
},
[]string{"server_api_version"},
)
// PodResourcesEndpointErrorsGetAllocatableCount is a Counter that tracks the number of errors returned by the PodResource GetAllocatableResources() endpoint.
// Broken down by server API version.
PodResourcesEndpointErrorsGetAllocatableCount = metrics.NewCounterVec(
&metrics.CounterOpts{
Subsystem: KubeletSubsystem,
Name: PodResourcesEndpointErrorsGetAllocatableKey,
Help: "Number of requests to the PodResource GetAllocatableResources endpoint which returned error. Broken down by server api version.",
StabilityLevel: metrics.ALPHA,
},
[]string{"server_api_version"},
)
// Metrics for node config
// AssignedConfig is a Gauge that is set 1 if the Kubelet has a NodeConfig assigned.

View File

@ -45,6 +45,97 @@ var _ = math.Inf
// proto package needs to be updated.
const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package
type AllocatableResourcesRequest struct {
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *AllocatableResourcesRequest) Reset() { *m = AllocatableResourcesRequest{} }
func (*AllocatableResourcesRequest) ProtoMessage() {}
func (*AllocatableResourcesRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_00212fb1f9d3bf1c, []int{0}
}
func (m *AllocatableResourcesRequest) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
}
func (m *AllocatableResourcesRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
if deterministic {
return xxx_messageInfo_AllocatableResourcesRequest.Marshal(b, m, deterministic)
} else {
b = b[:cap(b)]
n, err := m.MarshalToSizedBuffer(b)
if err != nil {
return nil, err
}
return b[:n], nil
}
}
func (m *AllocatableResourcesRequest) XXX_Merge(src proto.Message) {
xxx_messageInfo_AllocatableResourcesRequest.Merge(m, src)
}
func (m *AllocatableResourcesRequest) XXX_Size() int {
return m.Size()
}
func (m *AllocatableResourcesRequest) XXX_DiscardUnknown() {
xxx_messageInfo_AllocatableResourcesRequest.DiscardUnknown(m)
}
var xxx_messageInfo_AllocatableResourcesRequest proto.InternalMessageInfo
// AllocatableResourcesResponses contains informations about all the devices known by the kubelet
type AllocatableResourcesResponse struct {
Devices []*ContainerDevices `protobuf:"bytes,1,rep,name=devices,proto3" json:"devices,omitempty"`
CpuIds []int64 `protobuf:"varint,2,rep,packed,name=cpu_ids,json=cpuIds,proto3" json:"cpu_ids,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *AllocatableResourcesResponse) Reset() { *m = AllocatableResourcesResponse{} }
func (*AllocatableResourcesResponse) ProtoMessage() {}
func (*AllocatableResourcesResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_00212fb1f9d3bf1c, []int{1}
}
func (m *AllocatableResourcesResponse) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
}
func (m *AllocatableResourcesResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
if deterministic {
return xxx_messageInfo_AllocatableResourcesResponse.Marshal(b, m, deterministic)
} else {
b = b[:cap(b)]
n, err := m.MarshalToSizedBuffer(b)
if err != nil {
return nil, err
}
return b[:n], nil
}
}
func (m *AllocatableResourcesResponse) XXX_Merge(src proto.Message) {
xxx_messageInfo_AllocatableResourcesResponse.Merge(m, src)
}
func (m *AllocatableResourcesResponse) XXX_Size() int {
return m.Size()
}
func (m *AllocatableResourcesResponse) XXX_DiscardUnknown() {
xxx_messageInfo_AllocatableResourcesResponse.DiscardUnknown(m)
}
var xxx_messageInfo_AllocatableResourcesResponse proto.InternalMessageInfo
func (m *AllocatableResourcesResponse) GetDevices() []*ContainerDevices {
if m != nil {
return m.Devices
}
return nil
}
func (m *AllocatableResourcesResponse) GetCpuIds() []int64 {
if m != nil {
return m.CpuIds
}
return nil
}
// ListPodResourcesRequest is the request made to the PodResourcesLister service
type ListPodResourcesRequest struct {
XXX_NoUnkeyedLiteral struct{} `json:"-"`
@ -54,7 +145,7 @@ type ListPodResourcesRequest struct {
func (m *ListPodResourcesRequest) Reset() { *m = ListPodResourcesRequest{} }
func (*ListPodResourcesRequest) ProtoMessage() {}
func (*ListPodResourcesRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_00212fb1f9d3bf1c, []int{0}
return fileDescriptor_00212fb1f9d3bf1c, []int{2}
}
func (m *ListPodResourcesRequest) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
@ -93,7 +184,7 @@ type ListPodResourcesResponse struct {
func (m *ListPodResourcesResponse) Reset() { *m = ListPodResourcesResponse{} }
func (*ListPodResourcesResponse) ProtoMessage() {}
func (*ListPodResourcesResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_00212fb1f9d3bf1c, []int{1}
return fileDescriptor_00212fb1f9d3bf1c, []int{3}
}
func (m *ListPodResourcesResponse) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
@ -141,7 +232,7 @@ type PodResources struct {
func (m *PodResources) Reset() { *m = PodResources{} }
func (*PodResources) ProtoMessage() {}
func (*PodResources) Descriptor() ([]byte, []int) {
return fileDescriptor_00212fb1f9d3bf1c, []int{2}
return fileDescriptor_00212fb1f9d3bf1c, []int{4}
}
func (m *PodResources) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
@ -203,7 +294,7 @@ type ContainerResources struct {
func (m *ContainerResources) Reset() { *m = ContainerResources{} }
func (*ContainerResources) ProtoMessage() {}
func (*ContainerResources) Descriptor() ([]byte, []int) {
return fileDescriptor_00212fb1f9d3bf1c, []int{3}
return fileDescriptor_00212fb1f9d3bf1c, []int{5}
}
func (m *ContainerResources) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
@ -265,7 +356,7 @@ type ContainerDevices struct {
func (m *ContainerDevices) Reset() { *m = ContainerDevices{} }
func (*ContainerDevices) ProtoMessage() {}
func (*ContainerDevices) Descriptor() ([]byte, []int) {
return fileDescriptor_00212fb1f9d3bf1c, []int{4}
return fileDescriptor_00212fb1f9d3bf1c, []int{6}
}
func (m *ContainerDevices) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
@ -325,7 +416,7 @@ type TopologyInfo struct {
func (m *TopologyInfo) Reset() { *m = TopologyInfo{} }
func (*TopologyInfo) ProtoMessage() {}
func (*TopologyInfo) Descriptor() ([]byte, []int) {
return fileDescriptor_00212fb1f9d3bf1c, []int{5}
return fileDescriptor_00212fb1f9d3bf1c, []int{7}
}
func (m *TopologyInfo) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
@ -371,7 +462,7 @@ type NUMANode struct {
func (m *NUMANode) Reset() { *m = NUMANode{} }
func (*NUMANode) ProtoMessage() {}
func (*NUMANode) Descriptor() ([]byte, []int) {
return fileDescriptor_00212fb1f9d3bf1c, []int{6}
return fileDescriptor_00212fb1f9d3bf1c, []int{8}
}
func (m *NUMANode) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
@ -408,6 +499,8 @@ func (m *NUMANode) GetID() int64 {
}
func init() {
proto.RegisterType((*AllocatableResourcesRequest)(nil), "v1.AllocatableResourcesRequest")
proto.RegisterType((*AllocatableResourcesResponse)(nil), "v1.AllocatableResourcesResponse")
proto.RegisterType((*ListPodResourcesRequest)(nil), "v1.ListPodResourcesRequest")
proto.RegisterType((*ListPodResourcesResponse)(nil), "v1.ListPodResourcesResponse")
proto.RegisterType((*PodResources)(nil), "v1.PodResources")
@ -420,34 +513,37 @@ func init() {
func init() { proto.RegisterFile("api.proto", fileDescriptor_00212fb1f9d3bf1c) }
var fileDescriptor_00212fb1f9d3bf1c = []byte{
// 424 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x74, 0x52, 0xc1, 0x6e, 0xd3, 0x40,
0x10, 0xcd, 0xda, 0xa5, 0xad, 0x07, 0x17, 0x55, 0x2b, 0x44, 0x4d, 0x08, 0x56, 0xb4, 0x5c, 0x7a,
0x00, 0x57, 0x0d, 0x82, 0x3b, 0x34, 0x17, 0x4b, 0x10, 0xc1, 0x0a, 0x0e, 0x9c, 0x22, 0xc7, 0xbb,
0x35, 0x96, 0xa8, 0x67, 0xeb, 0xb5, 0x23, 0xb8, 0x71, 0xe0, 0x03, 0xf8, 0xac, 0x1e, 0x39, 0x72,
0xa4, 0xe6, 0x47, 0xd0, 0xae, 0x71, 0xe3, 0x90, 0x70, 0xf2, 0xcc, 0x7b, 0x33, 0xef, 0x8d, 0x77,
0x06, 0xbc, 0x44, 0xe5, 0x91, 0x2a, 0xb1, 0x42, 0xea, 0x2c, 0x4f, 0x87, 0x4f, 0xb2, 0xbc, 0xfa,
0x58, 0x2f, 0xa2, 0x14, 0x2f, 0x4e, 0x32, 0xcc, 0xf0, 0xc4, 0x52, 0x8b, 0xfa, 0xdc, 0x66, 0x36,
0xb1, 0x51, 0xdb, 0xc2, 0xee, 0xc3, 0xd1, 0xab, 0x5c, 0x57, 0x6f, 0x50, 0x70, 0xa9, 0xb1, 0x2e,
0x53, 0xa9, 0xb9, 0xbc, 0xac, 0xa5, 0xae, 0xd8, 0x5b, 0x08, 0x36, 0x29, 0xad, 0xb0, 0xd0, 0x92,
0x3e, 0x83, 0x03, 0x85, 0x62, 0x5e, 0x76, 0x44, 0x40, 0xc6, 0xee, 0xf1, 0xed, 0xc9, 0x61, 0xb4,
0x3c, 0x8d, 0xd6, 0x1a, 0x7c, 0xd5, 0xcb, 0xd8, 0x67, 0xf0, 0xfb, 0x2c, 0xa5, 0xb0, 0x53, 0x24,
0x17, 0x32, 0x20, 0x63, 0x72, 0xec, 0x71, 0x1b, 0xd3, 0x11, 0x78, 0xe6, 0xab, 0x55, 0x92, 0xca,
0xc0, 0xb1, 0xc4, 0x0a, 0xa0, 0xcf, 0x01, 0x52, 0x2c, 0xaa, 0x24, 0x2f, 0x64, 0xa9, 0x03, 0xd7,
0xba, 0xde, 0x33, 0xae, 0x67, 0x1d, 0xba, 0xf2, 0xee, 0x55, 0xb2, 0x4b, 0xa0, 0x9b, 0x15, 0x5b,
0xfd, 0x23, 0xd8, 0x13, 0x72, 0x99, 0x9b, 0x9f, 0x72, 0xac, 0xfc, 0xdd, 0x35, 0xf9, 0x69, 0xcb,
0xf1, 0xae, 0x88, 0x1e, 0xc1, 0x5e, 0xaa, 0xea, 0x79, 0x2e, 0xda, 0x71, 0x5c, 0xbe, 0x9b, 0xaa,
0x3a, 0x16, 0x9a, 0x7d, 0x23, 0x70, 0xf8, 0x6f, 0x1b, 0x7d, 0x04, 0x07, 0xdd, 0xa3, 0xcd, 0x7b,
0xd6, 0x7e, 0x07, 0xce, 0xcc, 0x08, 0x0f, 0x01, 0x5a, 0x75, 0xab, 0x6a, 0xa6, 0xf0, 0xb8, 0xd7,
0x22, 0xb1, 0xd0, 0xf4, 0x31, 0xec, 0x57, 0xa8, 0xf0, 0x13, 0x66, 0x5f, 0x02, 0x77, 0x4c, 0xba,
0x77, 0x7f, 0xf7, 0x17, 0x8b, 0x8b, 0x73, 0xe4, 0x37, 0x15, 0x6c, 0x02, 0x7e, 0x9f, 0xa1, 0x0c,
0x6e, 0x15, 0x28, 0x6e, 0x56, 0xe6, 0x9b, 0xd6, 0xd9, 0xfb, 0xd7, 0x2f, 0x66, 0x28, 0x24, 0x6f,
0x29, 0x36, 0x84, 0xfd, 0x0e, 0xa2, 0x77, 0xc0, 0x89, 0xa7, 0x76, 0x4c, 0x97, 0x3b, 0xf1, 0x74,
0xf2, 0x01, 0x68, 0x7f, 0x87, 0xe6, 0x44, 0x64, 0x49, 0xcf, 0x60, 0xc7, 0x44, 0xf4, 0x81, 0x91,
0xfb, 0xcf, 0x45, 0x0d, 0x47, 0xdb, 0xc9, 0xf6, 0xa6, 0xd8, 0xe0, 0xe5, 0xe8, 0xea, 0x3a, 0x24,
0x3f, 0xaf, 0xc3, 0xc1, 0xd7, 0x26, 0x24, 0x57, 0x4d, 0x48, 0x7e, 0x34, 0x21, 0xf9, 0xd5, 0x84,
0xe4, 0xfb, 0xef, 0x70, 0xb0, 0xd8, 0xb5, 0x17, 0xfb, 0xf4, 0x4f, 0x00, 0x00, 0x00, 0xff, 0xff,
0x1f, 0x52, 0x67, 0x23, 0xf1, 0x02, 0x00, 0x00,
// 480 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x93, 0xcd, 0x6e, 0xd3, 0x40,
0x10, 0x80, 0xb3, 0x76, 0x69, 0x9b, 0xc1, 0x45, 0xd5, 0x0a, 0x11, 0x93, 0xa6, 0xc6, 0x5a, 0x2e,
0x39, 0x80, 0xab, 0x06, 0xc1, 0xbd, 0x34, 0x12, 0xb2, 0x04, 0x11, 0xac, 0xe0, 0x4a, 0xe4, 0xd8,
0x5b, 0x63, 0x29, 0xf5, 0x6c, 0xbd, 0x76, 0x04, 0x37, 0x0e, 0x3c, 0x00, 0xaf, 0xc3, 0x1b, 0xf4,
0xc8, 0x91, 0x23, 0x0d, 0x2f, 0x82, 0xbc, 0x8e, 0x53, 0x87, 0xa4, 0x48, 0x3d, 0x79, 0x66, 0xbe,
0xf9, 0xf3, 0xcc, 0x2c, 0xb4, 0x03, 0x99, 0x78, 0x32, 0xc3, 0x1c, 0xa9, 0x31, 0x3b, 0xee, 0x3e,
0x8d, 0x93, 0xfc, 0x53, 0x31, 0xf1, 0x42, 0x3c, 0x3f, 0x8a, 0x31, 0xc6, 0x23, 0x8d, 0x26, 0xc5,
0x99, 0xd6, 0xb4, 0xa2, 0xa5, 0x2a, 0x84, 0x1d, 0xc2, 0xc1, 0xc9, 0x74, 0x8a, 0x61, 0x90, 0x07,
0x93, 0xa9, 0xe0, 0x42, 0x61, 0x91, 0x85, 0x42, 0x71, 0x71, 0x51, 0x08, 0x95, 0xb3, 0x18, 0x7a,
0x9b, 0xb1, 0x92, 0x98, 0x2a, 0x41, 0x3d, 0xd8, 0x89, 0xc4, 0x2c, 0x09, 0x85, 0xb2, 0x89, 0x6b,
0xf6, 0xef, 0x0e, 0xee, 0x7b, 0xb3, 0x63, 0xef, 0x14, 0xd3, 0x3c, 0x48, 0x52, 0x91, 0x0d, 0x2b,
0xc6, 0x6b, 0x27, 0xda, 0x81, 0x9d, 0x50, 0x16, 0xe3, 0x24, 0x52, 0xb6, 0xe1, 0x9a, 0x7d, 0x93,
0x6f, 0x87, 0xb2, 0xf0, 0x23, 0xc5, 0x1e, 0x42, 0xe7, 0x75, 0xa2, 0xf2, 0xb7, 0x18, 0xad, 0xf5,
0xf0, 0x0e, 0xec, 0x75, 0xb4, 0xa8, 0xff, 0x1c, 0xf6, 0x24, 0x46, 0xe3, 0xac, 0x06, 0x8b, 0x2e,
0xf6, 0xcb, 0x2e, 0x56, 0x02, 0x2c, 0xd9, 0xd0, 0xd8, 0x67, 0xb0, 0x9a, 0x94, 0x52, 0xd8, 0x4a,
0x83, 0x73, 0x61, 0x13, 0x97, 0xf4, 0xdb, 0x5c, 0xcb, 0xb4, 0x07, 0xed, 0xf2, 0xab, 0x64, 0x10,
0x0a, 0xdb, 0xd0, 0xe0, 0xda, 0x40, 0x5f, 0x00, 0x84, 0xf5, 0x5f, 0x2a, 0xdb, 0xd4, 0x55, 0x1f,
0xac, 0xfc, 0xfb, 0x75, 0xed, 0x86, 0x27, 0xbb, 0x00, 0xba, 0xee, 0xb1, 0xb1, 0x7e, 0x63, 0xb4,
0xc6, 0x2d, 0x47, 0x6b, 0xae, 0x8c, 0xf6, 0x1b, 0x81, 0xfd, 0x7f, 0xc3, 0xe8, 0x63, 0xd8, 0xab,
0x87, 0x36, 0x6e, 0x94, 0xb6, 0x6a, 0xe3, 0xa8, 0x6c, 0xe1, 0x10, 0xa0, 0xca, 0xbe, 0x5c, 0x58,
0x9b, 0xb7, 0x2b, 0x8b, 0x1f, 0x29, 0xfa, 0x04, 0x76, 0x73, 0x94, 0x38, 0xc5, 0xf8, 0x8b, 0x6d,
0xba, 0xa4, 0x9e, 0xfb, 0xfb, 0x85, 0xcd, 0x4f, 0xcf, 0x90, 0x2f, 0x3d, 0xd8, 0x00, 0xac, 0x26,
0xa1, 0x0c, 0xee, 0xa4, 0x18, 0x2d, 0x57, 0x66, 0x95, 0xa1, 0xa3, 0x0f, 0x6f, 0x4e, 0x46, 0x18,
0x09, 0x5e, 0x21, 0xd6, 0x85, 0xdd, 0xda, 0x44, 0xef, 0x81, 0xe1, 0x0f, 0x75, 0x9b, 0x26, 0x37,
0xfc, 0xe1, 0xe0, 0x07, 0x01, 0xda, 0x5c, 0x62, 0x79, 0x23, 0x22, 0xa3, 0xa7, 0xb0, 0x55, 0x4a,
0xf4, 0xa0, 0xcc, 0x77, 0xc3, 0x49, 0x75, 0x7b, 0x9b, 0x61, 0x75, 0x54, 0xac, 0x45, 0x3f, 0x42,
0xe7, 0x95, 0xc8, 0x37, 0x5d, 0x3e, 0x7d, 0x54, 0x86, 0xfe, 0xe7, 0xc9, 0x74, 0xdd, 0x9b, 0x1d,
0xea, 0xfc, 0x2f, 0x7b, 0x97, 0x57, 0x0e, 0xf9, 0x75, 0xe5, 0xb4, 0xbe, 0xce, 0x1d, 0x72, 0x39,
0x77, 0xc8, 0xcf, 0xb9, 0x43, 0x7e, 0xcf, 0x1d, 0xf2, 0xfd, 0x8f, 0xd3, 0x9a, 0x6c, 0xeb, 0xa7,
0xf9, 0xec, 0x6f, 0x00, 0x00, 0x00, 0xff, 0xff, 0x6f, 0x70, 0xd4, 0x4f, 0xda, 0x03, 0x00, 0x00,
}
// Reference imports to suppress errors if they are not otherwise used.
@ -463,6 +559,7 @@ const _ = grpc.SupportPackageIsVersion4
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.
type PodResourcesListerClient interface {
List(ctx context.Context, in *ListPodResourcesRequest, opts ...grpc.CallOption) (*ListPodResourcesResponse, error)
GetAllocatableResources(ctx context.Context, in *AllocatableResourcesRequest, opts ...grpc.CallOption) (*AllocatableResourcesResponse, error)
}
type podResourcesListerClient struct {
@ -482,9 +579,19 @@ func (c *podResourcesListerClient) List(ctx context.Context, in *ListPodResource
return out, nil
}
func (c *podResourcesListerClient) GetAllocatableResources(ctx context.Context, in *AllocatableResourcesRequest, opts ...grpc.CallOption) (*AllocatableResourcesResponse, error) {
out := new(AllocatableResourcesResponse)
err := c.cc.Invoke(ctx, "/v1.PodResourcesLister/GetAllocatableResources", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// PodResourcesListerServer is the server API for PodResourcesLister service.
type PodResourcesListerServer interface {
List(context.Context, *ListPodResourcesRequest) (*ListPodResourcesResponse, error)
GetAllocatableResources(context.Context, *AllocatableResourcesRequest) (*AllocatableResourcesResponse, error)
}
// UnimplementedPodResourcesListerServer can be embedded to have forward compatible implementations.
@ -494,6 +601,9 @@ type UnimplementedPodResourcesListerServer struct {
func (*UnimplementedPodResourcesListerServer) List(ctx context.Context, req *ListPodResourcesRequest) (*ListPodResourcesResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method List not implemented")
}
func (*UnimplementedPodResourcesListerServer) GetAllocatableResources(ctx context.Context, req *AllocatableResourcesRequest) (*AllocatableResourcesResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method GetAllocatableResources not implemented")
}
func RegisterPodResourcesListerServer(s *grpc.Server, srv PodResourcesListerServer) {
s.RegisterService(&_PodResourcesLister_serviceDesc, srv)
@ -517,6 +627,24 @@ func _PodResourcesLister_List_Handler(srv interface{}, ctx context.Context, dec
return interceptor(ctx, in, info, handler)
}
func _PodResourcesLister_GetAllocatableResources_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(AllocatableResourcesRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(PodResourcesListerServer).GetAllocatableResources(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/v1.PodResourcesLister/GetAllocatableResources",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(PodResourcesListerServer).GetAllocatableResources(ctx, req.(*AllocatableResourcesRequest))
}
return interceptor(ctx, in, info, handler)
}
var _PodResourcesLister_serviceDesc = grpc.ServiceDesc{
ServiceName: "v1.PodResourcesLister",
HandlerType: (*PodResourcesListerServer)(nil),
@ -525,11 +653,94 @@ var _PodResourcesLister_serviceDesc = grpc.ServiceDesc{
MethodName: "List",
Handler: _PodResourcesLister_List_Handler,
},
{
MethodName: "GetAllocatableResources",
Handler: _PodResourcesLister_GetAllocatableResources_Handler,
},
},
Streams: []grpc.StreamDesc{},
Metadata: "api.proto",
}
func (m *AllocatableResourcesRequest) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalToSizedBuffer(dAtA[:size])
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *AllocatableResourcesRequest) MarshalTo(dAtA []byte) (int, error) {
size := m.Size()
return m.MarshalToSizedBuffer(dAtA[:size])
}
func (m *AllocatableResourcesRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
_ = l
return len(dAtA) - i, nil
}
func (m *AllocatableResourcesResponse) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalToSizedBuffer(dAtA[:size])
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *AllocatableResourcesResponse) MarshalTo(dAtA []byte) (int, error) {
size := m.Size()
return m.MarshalToSizedBuffer(dAtA[:size])
}
func (m *AllocatableResourcesResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
_ = l
if len(m.CpuIds) > 0 {
dAtA2 := make([]byte, len(m.CpuIds)*10)
var j1 int
for _, num1 := range m.CpuIds {
num := uint64(num1)
for num >= 1<<7 {
dAtA2[j1] = uint8(uint64(num)&0x7f | 0x80)
num >>= 7
j1++
}
dAtA2[j1] = uint8(num)
j1++
}
i -= j1
copy(dAtA[i:], dAtA2[:j1])
i = encodeVarintApi(dAtA, i, uint64(j1))
i--
dAtA[i] = 0x12
}
if len(m.Devices) > 0 {
for iNdEx := len(m.Devices) - 1; iNdEx >= 0; iNdEx-- {
{
size, err := m.Devices[iNdEx].MarshalToSizedBuffer(dAtA[:i])
if err != nil {
return 0, err
}
i -= size
i = encodeVarintApi(dAtA, i, uint64(size))
}
i--
dAtA[i] = 0xa
}
}
return len(dAtA) - i, nil
}
func (m *ListPodResourcesRequest) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
@ -662,21 +873,21 @@ func (m *ContainerResources) MarshalToSizedBuffer(dAtA []byte) (int, error) {
var l int
_ = l
if len(m.CpuIds) > 0 {
dAtA2 := make([]byte, len(m.CpuIds)*10)
var j1 int
dAtA4 := make([]byte, len(m.CpuIds)*10)
var j3 int
for _, num1 := range m.CpuIds {
num := uint64(num1)
for num >= 1<<7 {
dAtA2[j1] = uint8(uint64(num)&0x7f | 0x80)
dAtA4[j3] = uint8(uint64(num)&0x7f | 0x80)
num >>= 7
j1++
j3++
}
dAtA2[j1] = uint8(num)
j1++
dAtA4[j3] = uint8(num)
j3++
}
i -= j1
copy(dAtA[i:], dAtA2[:j1])
i = encodeVarintApi(dAtA, i, uint64(j1))
i -= j3
copy(dAtA[i:], dAtA4[:j3])
i = encodeVarintApi(dAtA, i, uint64(j3))
i--
dAtA[i] = 0x1a
}
@ -831,6 +1042,37 @@ func encodeVarintApi(dAtA []byte, offset int, v uint64) int {
dAtA[offset] = uint8(v)
return base
}
func (m *AllocatableResourcesRequest) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
return n
}
func (m *AllocatableResourcesResponse) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
if len(m.Devices) > 0 {
for _, e := range m.Devices {
l = e.Size()
n += 1 + l + sovApi(uint64(l))
}
}
if len(m.CpuIds) > 0 {
l = 0
for _, e := range m.CpuIds {
l += sovApi(uint64(e))
}
n += 1 + sovApi(uint64(l)) + l
}
return n
}
func (m *ListPodResourcesRequest) Size() (n int) {
if m == nil {
return 0
@ -960,6 +1202,31 @@ func sovApi(x uint64) (n int) {
func sozApi(x uint64) (n int) {
return sovApi(uint64((x << 1) ^ uint64((int64(x) >> 63))))
}
func (this *AllocatableResourcesRequest) String() string {
if this == nil {
return "nil"
}
s := strings.Join([]string{`&AllocatableResourcesRequest{`,
`}`,
}, "")
return s
}
func (this *AllocatableResourcesResponse) String() string {
if this == nil {
return "nil"
}
repeatedStringForDevices := "[]*ContainerDevices{"
for _, f := range this.Devices {
repeatedStringForDevices += strings.Replace(f.String(), "ContainerDevices", "ContainerDevices", 1) + ","
}
repeatedStringForDevices += "}"
s := strings.Join([]string{`&AllocatableResourcesResponse{`,
`Devices:` + repeatedStringForDevices + `,`,
`CpuIds:` + fmt.Sprintf("%v", this.CpuIds) + `,`,
`}`,
}, "")
return s
}
func (this *ListPodResourcesRequest) String() string {
if this == nil {
return "nil"
@ -1063,6 +1330,216 @@ func valueToStringApi(v interface{}) string {
pv := reflect.Indirect(rv).Interface()
return fmt.Sprintf("*%v", pv)
}
func (m *AllocatableResourcesRequest) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowApi
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: AllocatableResourcesRequest: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: AllocatableResourcesRequest: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
default:
iNdEx = preIndex
skippy, err := skipApi(dAtA[iNdEx:])
if err != nil {
return err
}
if (skippy < 0) || (iNdEx+skippy) < 0 {
return ErrInvalidLengthApi
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func (m *AllocatableResourcesResponse) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowApi
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: AllocatableResourcesResponse: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: AllocatableResourcesResponse: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Devices", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowApi
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthApi
}
postIndex := iNdEx + msglen
if postIndex < 0 {
return ErrInvalidLengthApi
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Devices = append(m.Devices, &ContainerDevices{})
if err := m.Devices[len(m.Devices)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex
case 2:
if wireType == 0 {
var v int64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowApi
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
v |= int64(b&0x7F) << shift
if b < 0x80 {
break
}
}
m.CpuIds = append(m.CpuIds, v)
} else if wireType == 2 {
var packedLen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowApi
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
packedLen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if packedLen < 0 {
return ErrInvalidLengthApi
}
postIndex := iNdEx + packedLen
if postIndex < 0 {
return ErrInvalidLengthApi
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
var elementCount int
var count int
for _, integer := range dAtA[iNdEx:postIndex] {
if integer < 128 {
count++
}
}
elementCount = count
if elementCount != 0 && len(m.CpuIds) == 0 {
m.CpuIds = make([]int64, 0, elementCount)
}
for iNdEx < postIndex {
var v int64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowApi
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
v |= int64(b&0x7F) << shift
if b < 0x80 {
break
}
}
m.CpuIds = append(m.CpuIds, v)
}
} else {
return fmt.Errorf("proto: wrong wireType = %d for field CpuIds", wireType)
}
default:
iNdEx = preIndex
skippy, err := skipApi(dAtA[iNdEx:])
if err != nil {
return err
}
if (skippy < 0) || (iNdEx+skippy) < 0 {
return ErrInvalidLengthApi
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func (m *ListPodResourcesRequest) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0

View File

@ -18,6 +18,15 @@ option (gogoproto.goproto_unrecognized_all) = false;
// node resources consumed by pods and containers on the node
service PodResourcesLister {
rpc List(ListPodResourcesRequest) returns (ListPodResourcesResponse) {}
rpc GetAllocatableResources(AllocatableResourcesRequest) returns (AllocatableResourcesResponse) {}
}
message AllocatableResourcesRequest {}
// AllocatableResourcesResponses contains informations about all the devices known by the kubelet
message AllocatableResourcesResponse {
repeated ContainerDevices devices = 1;
repeated int64 cpu_ids = 2;
}
// ListPodResourcesRequest is the request made to the PodResourcesLister service

View File

@ -81,13 +81,17 @@ func makeCPUManagerPod(podName string, ctnAttributes []ctnAttribute) *v1.Pod {
}
}
func deletePodSyncByName(f *framework.Framework, podName string) {
gp := int64(0)
delOpts := metav1.DeleteOptions{
GracePeriodSeconds: &gp,
}
f.PodClient().DeleteSync(podName, delOpts, framework.DefaultPodDeletionTimeout)
}
func deletePods(f *framework.Framework, podNames []string) {
for _, podName := range podNames {
gp := int64(0)
delOpts := metav1.DeleteOptions{
GracePeriodSeconds: &gp,
}
f.PodClient().DeleteSync(podName, delOpts, framework.DefaultPodDeletionTimeout)
deletePodSyncByName(f, podName)
}
}
@ -206,6 +210,10 @@ func disableCPUManagerInKubelet(f *framework.Framework) (oldCfg *kubeletconfig.K
}
func enableCPUManagerInKubelet(f *framework.Framework, cleanStateFile bool) (oldCfg *kubeletconfig.KubeletConfiguration) {
return configureCPUManagerInKubelet(f, cleanStateFile, cpuset.CPUSet{})
}
func configureCPUManagerInKubelet(f *framework.Framework, cleanStateFile bool, reservedSystemCPUs cpuset.CPUSet) (oldCfg *kubeletconfig.KubeletConfiguration) {
// Enable CPU Manager in Kubelet with static policy.
oldCfg, err := getCurrentKubeletConfig()
framework.ExpectNoError(err)
@ -235,15 +243,21 @@ func enableCPUManagerInKubelet(f *framework.Framework, cleanStateFile bool) (old
// Set the CPU Manager reconcile period to 1 second.
newCfg.CPUManagerReconcilePeriod = metav1.Duration{Duration: 1 * time.Second}
// The Kubelet panics if either kube-reserved or system-reserved is not set
// when CPU Manager is enabled. Set cpu in kube-reserved > 0 so that
// kubelet doesn't panic.
if newCfg.KubeReserved == nil {
newCfg.KubeReserved = map[string]string{}
}
if reservedSystemCPUs.Size() > 0 {
cpus := reservedSystemCPUs.String()
framework.Logf("configureCPUManagerInKubelet: using reservedSystemCPUs=%q", cpus)
newCfg.ReservedSystemCPUs = cpus
} else {
// The Kubelet panics if either kube-reserved or system-reserved is not set
// when CPU Manager is enabled. Set cpu in kube-reserved > 0 so that
// kubelet doesn't panic.
if newCfg.KubeReserved == nil {
newCfg.KubeReserved = map[string]string{}
}
if _, ok := newCfg.KubeReserved["cpu"]; !ok {
newCfg.KubeReserved["cpu"] = "200m"
if _, ok := newCfg.KubeReserved["cpu"]; !ok {
newCfg.KubeReserved["cpu"] = "200m"
}
}
// Update the Kubelet configuration.
framework.ExpectNoError(setKubeletConfiguration(f, newCfg))

View File

@ -0,0 +1,754 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package e2enode
import (
"context"
"fmt"
"io/ioutil"
"strings"
"sync"
"time"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilfeature "k8s.io/apiserver/pkg/util/feature"
kubeletpodresourcesv1 "k8s.io/kubelet/pkg/apis/podresources/v1"
kubefeatures "k8s.io/kubernetes/pkg/features"
kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config"
"k8s.io/kubernetes/pkg/kubelet/apis/podresources"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager"
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
"k8s.io/kubernetes/pkg/kubelet/util"
"k8s.io/kubernetes/test/e2e/framework"
e2enode "k8s.io/kubernetes/test/e2e/framework/node"
e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper"
"github.com/onsi/ginkgo"
"github.com/onsi/gomega"
)
type podDesc struct {
podName string
cntName string
resourceName string
resourceAmount int
cpuCount int
}
func makePodResourcesTestPod(desc podDesc) *v1.Pod {
cnt := v1.Container{
Name: desc.cntName,
Image: busyboxImage,
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{},
Limits: v1.ResourceList{},
},
Command: []string{"sh", "-c", "sleep 1d"},
}
if desc.cpuCount > 0 {
cnt.Resources.Requests[v1.ResourceCPU] = resource.MustParse(fmt.Sprintf("%d", desc.cpuCount))
cnt.Resources.Limits[v1.ResourceCPU] = resource.MustParse(fmt.Sprintf("%d", desc.cpuCount))
// we don't really care, we only need to be in guaranteed QoS
cnt.Resources.Requests[v1.ResourceMemory] = resource.MustParse("100Mi")
cnt.Resources.Limits[v1.ResourceMemory] = resource.MustParse("100Mi")
}
if desc.resourceName != "" && desc.resourceAmount > 0 {
cnt.Resources.Requests[v1.ResourceName(desc.resourceName)] = resource.MustParse(fmt.Sprintf("%d", desc.resourceAmount))
cnt.Resources.Limits[v1.ResourceName(desc.resourceName)] = resource.MustParse(fmt.Sprintf("%d", desc.resourceAmount))
}
return &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: desc.podName,
},
Spec: v1.PodSpec{
RestartPolicy: v1.RestartPolicyNever,
Containers: []v1.Container{
cnt,
},
},
}
}
func logPodResources(podIdx int, pr *kubeletpodresourcesv1.PodResources) {
ns := pr.GetNamespace()
cnts := pr.GetContainers()
if len(cnts) == 0 {
framework.Logf("#%02d/%02d/%02d - %s/%s/%s No containers", podIdx, 0, 0, ns, pr.GetName(), "_")
return
}
for cntIdx, cnt := range cnts {
if len(cnt.Devices) == 0 {
framework.Logf("#%02d/%02d/%02d - %s/%s/%s cpus -> %v resources -> none", podIdx, cntIdx, 0, ns, pr.GetName(), cnt.Name, cnt.CpuIds)
continue
}
for devIdx, dev := range cnt.Devices {
framework.Logf("#%02d/%02d/%02d - %s/%s/%s cpus -> %v %s -> %s", podIdx, cntIdx, devIdx, ns, pr.GetName(), cnt.Name, cnt.CpuIds, dev.ResourceName, strings.Join(dev.DeviceIds, ", "))
}
}
}
type podResMap map[string]map[string]kubeletpodresourcesv1.ContainerResources
func getPodResources(cli kubeletpodresourcesv1.PodResourcesListerClient) podResMap {
resp, err := cli.List(context.TODO(), &kubeletpodresourcesv1.ListPodResourcesRequest{})
framework.ExpectNoError(err)
res := make(map[string]map[string]kubeletpodresourcesv1.ContainerResources)
for idx, podResource := range resp.GetPodResources() {
// to make troubleshooting easier
logPodResources(idx, podResource)
cnts := make(map[string]kubeletpodresourcesv1.ContainerResources)
for _, cnt := range podResource.GetContainers() {
cnts[cnt.GetName()] = *cnt
}
res[podResource.GetName()] = cnts
}
return res
}
type testPodData struct {
PodMap map[string]*v1.Pod
}
func newTestPodData() *testPodData {
return &testPodData{
PodMap: make(map[string]*v1.Pod),
}
}
func (tpd *testPodData) createPodsForTest(f *framework.Framework, podReqs []podDesc) {
for _, podReq := range podReqs {
pod := makePodResourcesTestPod(podReq)
pod = f.PodClient().CreateSync(pod)
framework.Logf("created pod %s", podReq.podName)
tpd.PodMap[podReq.podName] = pod
}
}
/* deletePodsForTest clean up all the pods run for a testcase. Must ensure proper cleanup */
func (tpd *testPodData) deletePodsForTest(f *framework.Framework) {
podNS := f.Namespace.Name
var wg sync.WaitGroup
for podName := range tpd.PodMap {
wg.Add(1)
go func(podName string) {
defer ginkgo.GinkgoRecover()
defer wg.Done()
deletePodSyncByName(f, podName)
waitForAllContainerRemoval(podName, podNS)
}(podName)
}
wg.Wait()
}
/* deletePod removes pod during a test. Should do a best-effort clean up */
func (tpd *testPodData) deletePod(f *framework.Framework, podName string) {
_, ok := tpd.PodMap[podName]
if !ok {
return
}
deletePodSyncByName(f, podName)
delete(tpd.PodMap, podName)
}
func findContainerDeviceByName(devs []*kubeletpodresourcesv1.ContainerDevices, resourceName string) *kubeletpodresourcesv1.ContainerDevices {
for _, dev := range devs {
if dev.ResourceName == resourceName {
return dev
}
}
return nil
}
func matchPodDescWithResources(expected []podDesc, found podResMap) error {
for _, podReq := range expected {
framework.Logf("matching: %#v", podReq)
podInfo, ok := found[podReq.podName]
if !ok {
return fmt.Errorf("no pod resources for pod %q", podReq.podName)
}
cntInfo, ok := podInfo[podReq.cntName]
if !ok {
return fmt.Errorf("no container resources for pod %q container %q", podReq.podName, podReq.cntName)
}
if podReq.cpuCount > 0 {
if len(cntInfo.CpuIds) != podReq.cpuCount {
return fmt.Errorf("pod %q container %q expected %d cpus got %v", podReq.podName, podReq.cntName, podReq.cpuCount, cntInfo.CpuIds)
}
}
if podReq.resourceName != "" && podReq.resourceAmount > 0 {
dev := findContainerDeviceByName(cntInfo.GetDevices(), podReq.resourceName)
if dev == nil {
return fmt.Errorf("pod %q container %q expected data for resource %q not found", podReq.podName, podReq.cntName, podReq.resourceName)
}
if len(dev.DeviceIds) != podReq.resourceAmount {
return fmt.Errorf("pod %q container %q resource %q expected %d items got %v", podReq.podName, podReq.cntName, podReq.resourceName, podReq.resourceAmount, dev.DeviceIds)
}
} else {
devs := cntInfo.GetDevices()
if len(devs) > 0 {
return fmt.Errorf("pod %q container %q expected no resources, got %v", podReq.podName, podReq.cntName, devs)
}
}
}
return nil
}
func expectPodResources(offset int, cli kubeletpodresourcesv1.PodResourcesListerClient, expected []podDesc) {
gomega.EventuallyWithOffset(1+offset, func() error {
found := getPodResources(cli)
return matchPodDescWithResources(expected, found)
}, time.Minute, 10*time.Second).Should(gomega.BeNil())
}
func filterOutDesc(descs []podDesc, name string) []podDesc {
var ret []podDesc
for _, desc := range descs {
if desc.podName == name {
continue
}
ret = append(ret, desc)
}
return ret
}
func podresourcesListTests(f *framework.Framework, cli kubeletpodresourcesv1.PodResourcesListerClient, sd *sriovData) {
var tpd *testPodData
var found podResMap
var expected []podDesc
var extra podDesc
expectedBasePods := 0 /* nothing but pods we create */
if sd != nil {
expectedBasePods = 1 // sriovdp
}
ginkgo.By("checking the output when no pods are present")
found = getPodResources(cli)
gomega.ExpectWithOffset(1, found).To(gomega.HaveLen(expectedBasePods), "base pod expectation mismatch")
tpd = newTestPodData()
ginkgo.By("checking the output when only pods which don't require resources are present")
expected = []podDesc{
{
podName: "pod-00",
cntName: "cnt-00",
},
{
podName: "pod-01",
cntName: "cnt-00",
},
}
tpd.createPodsForTest(f, expected)
expectPodResources(1, cli, expected)
tpd.deletePodsForTest(f)
tpd = newTestPodData()
ginkgo.By("checking the output when only a subset of pods require resources")
if sd != nil {
expected = []podDesc{
{
podName: "pod-00",
cntName: "cnt-00",
},
{
podName: "pod-01",
cntName: "cnt-00",
resourceName: sd.resourceName,
resourceAmount: 1,
cpuCount: 2,
},
{
podName: "pod-02",
cntName: "cnt-00",
cpuCount: 2,
},
{
podName: "pod-03",
cntName: "cnt-00",
resourceName: sd.resourceName,
resourceAmount: 1,
cpuCount: 1,
},
}
} else {
expected = []podDesc{
{
podName: "pod-00",
cntName: "cnt-00",
},
{
podName: "pod-01",
cntName: "cnt-00",
cpuCount: 2,
},
{
podName: "pod-02",
cntName: "cnt-00",
cpuCount: 2,
},
{
podName: "pod-03",
cntName: "cnt-00",
cpuCount: 1,
},
}
}
tpd.createPodsForTest(f, expected)
expectPodResources(1, cli, expected)
tpd.deletePodsForTest(f)
tpd = newTestPodData()
ginkgo.By("checking the output when creating pods which require resources between calls")
if sd != nil {
expected = []podDesc{
{
podName: "pod-00",
cntName: "cnt-00",
},
{
podName: "pod-01",
cntName: "cnt-00",
resourceName: sd.resourceName,
resourceAmount: 1,
cpuCount: 2,
},
{
podName: "pod-02",
cntName: "cnt-00",
cpuCount: 2,
},
}
} else {
expected = []podDesc{
{
podName: "pod-00",
cntName: "cnt-00",
},
{
podName: "pod-01",
cntName: "cnt-00",
cpuCount: 2,
},
{
podName: "pod-02",
cntName: "cnt-00",
cpuCount: 2,
},
}
}
tpd.createPodsForTest(f, expected)
expectPodResources(1, cli, expected)
if sd != nil {
extra = podDesc{
podName: "pod-03",
cntName: "cnt-00",
resourceName: sd.resourceName,
resourceAmount: 1,
cpuCount: 1,
}
} else {
extra = podDesc{
podName: "pod-03",
cntName: "cnt-00",
cpuCount: 1,
}
}
tpd.createPodsForTest(f, []podDesc{
extra,
})
expected = append(expected, extra)
expectPodResources(1, cli, expected)
tpd.deletePodsForTest(f)
tpd = newTestPodData()
ginkgo.By("checking the output when deleting pods which require resources between calls")
if sd != nil {
expected = []podDesc{
{
podName: "pod-00",
cntName: "cnt-00",
cpuCount: 1,
},
{
podName: "pod-01",
cntName: "cnt-00",
resourceName: sd.resourceName,
resourceAmount: 1,
cpuCount: 2,
},
{
podName: "pod-02",
cntName: "cnt-00",
},
{
podName: "pod-03",
cntName: "cnt-00",
resourceName: sd.resourceName,
resourceAmount: 1,
cpuCount: 1,
},
}
} else {
expected = []podDesc{
{
podName: "pod-00",
cntName: "cnt-00",
cpuCount: 1,
},
{
podName: "pod-01",
cntName: "cnt-00",
cpuCount: 2,
},
{
podName: "pod-02",
cntName: "cnt-00",
},
{
podName: "pod-03",
cntName: "cnt-00",
cpuCount: 1,
},
}
}
tpd.createPodsForTest(f, expected)
expectPodResources(1, cli, expected)
tpd.deletePod(f, "pod-01")
expectedPostDelete := filterOutDesc(expected, "pod-01")
expectPodResources(1, cli, expectedPostDelete)
tpd.deletePodsForTest(f)
}
func podresourcesGetAllocatableResourcesTests(f *framework.Framework, cli kubeletpodresourcesv1.PodResourcesListerClient, sd *sriovData, onlineCPUs, reservedSystemCPUs cpuset.CPUSet) {
ginkgo.By("checking the devices known to the kubelet")
resp, err := cli.GetAllocatableResources(context.TODO(), &kubeletpodresourcesv1.AllocatableResourcesRequest{})
framework.ExpectNoErrorWithOffset(1, err)
devs := resp.GetDevices()
allocatableCPUs := cpuset.NewCPUSetInt64(resp.GetCpuIds()...)
if onlineCPUs.Size() == 0 {
ginkgo.By("expecting no CPUs reported")
gomega.ExpectWithOffset(1, onlineCPUs.Size()).To(gomega.Equal(reservedSystemCPUs.Size()), "with no online CPUs, no CPUs should be reserved")
} else {
ginkgo.By(fmt.Sprintf("expecting online CPUs reported - online=%v (%d) reserved=%v (%d)", onlineCPUs, onlineCPUs.Size(), reservedSystemCPUs, reservedSystemCPUs.Size()))
if reservedSystemCPUs.Size() > onlineCPUs.Size() {
ginkgo.Fail("more reserved CPUs than online")
}
expectedCPUs := onlineCPUs.Difference(reservedSystemCPUs)
ginkgo.By(fmt.Sprintf("expecting CPUs '%v'='%v'", allocatableCPUs, expectedCPUs))
gomega.ExpectWithOffset(1, allocatableCPUs.Equals(expectedCPUs)).To(gomega.BeTrue(), "mismatch expecting CPUs")
}
if sd == nil { // no devices in the environment, so expect no devices
ginkgo.By("expecting no devices reported")
gomega.ExpectWithOffset(1, devs).To(gomega.BeEmpty(), fmt.Sprintf("got unexpected devices %#v", devs))
return
}
ginkgo.By(fmt.Sprintf("expecting some %q devices reported", sd.resourceName))
gomega.ExpectWithOffset(1, devs).ToNot(gomega.BeEmpty())
for _, dev := range devs {
framework.ExpectEqual(dev.ResourceName, sd.resourceName)
gomega.ExpectWithOffset(1, dev.DeviceIds).ToNot(gomega.BeEmpty())
}
}
// Serial because the test updates kubelet configuration.
var _ = SIGDescribe("POD Resources [Serial] [Feature:PodResources][NodeFeature:PodResources]", func() {
f := framework.NewDefaultFramework("podresources-test")
reservedSystemCPUs := cpuset.MustParse("1")
ginkgo.Context("With SRIOV devices in the system", func() {
ginkgo.It("should return the expected responses with cpumanager static policy enabled", func() {
// this is a very rough check. We just want to rule out system that does NOT have enough resources
_, cpuAlloc, _ := getLocalNodeCPUDetails(f)
if cpuAlloc < minCoreCount {
e2eskipper.Skipf("Skipping CPU Manager tests since the CPU allocatable < %d", minCoreCount)
}
if sriovdevCount, err := countSRIOVDevices(); err != nil || sriovdevCount == 0 {
e2eskipper.Skipf("this test is meant to run on a system with at least one configured VF from SRIOV device")
}
onlineCPUs, err := getOnlineCPUs()
framework.ExpectNoError(err)
// Make sure all the feature gates and the right settings are in place.
oldCfg := configurePodResourcesInKubelet(f, true, reservedSystemCPUs)
defer func() {
// restore kubelet config
setOldKubeletConfig(f, oldCfg)
// Delete state file to allow repeated runs
deleteStateFile()
}()
configMap := getSRIOVDevicePluginConfigMap(framework.TestContext.SriovdpConfigMapFile)
sd := setupSRIOVConfigOrFail(f, configMap)
defer teardownSRIOVConfigOrFail(f, sd)
waitForSRIOVResources(f, sd)
endpoint, err := util.LocalEndpoint(defaultPodResourcesPath, podresources.Socket)
framework.ExpectNoError(err)
cli, conn, err := podresources.GetV1Client(endpoint, defaultPodResourcesTimeout, defaultPodResourcesMaxSize)
framework.ExpectNoError(err)
defer conn.Close()
waitForSRIOVResources(f, sd)
ginkgo.By("checking List()")
podresourcesListTests(f, cli, sd)
ginkgo.By("checking GetAllocatableResources()")
podresourcesGetAllocatableResourcesTests(f, cli, sd, onlineCPUs, reservedSystemCPUs)
})
ginkgo.It("should return the expected responses with cpumanager none policy", func() {
// current default is "none" policy - no need to restart the kubelet
if sriovdevCount, err := countSRIOVDevices(); err != nil || sriovdevCount == 0 {
e2eskipper.Skipf("this test is meant to run on a system with at least one configured VF from SRIOV device")
}
oldCfg := enablePodResourcesFeatureGateInKubelet(f)
defer func() {
// restore kubelet config
setOldKubeletConfig(f, oldCfg)
// Delete state file to allow repeated runs
deleteStateFile()
}()
configMap := getSRIOVDevicePluginConfigMap(framework.TestContext.SriovdpConfigMapFile)
sd := setupSRIOVConfigOrFail(f, configMap)
defer teardownSRIOVConfigOrFail(f, sd)
waitForSRIOVResources(f, sd)
endpoint, err := util.LocalEndpoint(defaultPodResourcesPath, podresources.Socket)
framework.ExpectNoError(err)
cli, conn, err := podresources.GetV1Client(endpoint, defaultPodResourcesTimeout, defaultPodResourcesMaxSize)
framework.ExpectNoError(err)
defer conn.Close()
waitForSRIOVResources(f, sd)
// intentionally passing empty cpuset instead of onlineCPUs because with none policy
// we should get no allocatable cpus - no exclusively allocatable CPUs, depends on policy static
podresourcesGetAllocatableResourcesTests(f, cli, sd, cpuset.CPUSet{}, cpuset.CPUSet{})
})
})
ginkgo.Context("Without SRIOV devices in the system", func() {
ginkgo.It("should return the expected responses with cpumanager static policy enabled", func() {
// this is a very rough check. We just want to rule out system that does NOT have enough resources
_, cpuAlloc, _ := getLocalNodeCPUDetails(f)
if cpuAlloc < minCoreCount {
e2eskipper.Skipf("Skipping CPU Manager tests since the CPU allocatable < %d", minCoreCount)
}
if sriovdevCount, err := countSRIOVDevices(); err != nil || sriovdevCount > 0 {
e2eskipper.Skipf("this test is meant to run on a system with no configured VF from SRIOV device")
}
onlineCPUs, err := getOnlineCPUs()
framework.ExpectNoError(err)
// Make sure all the feature gates and the right settings are in place.
oldCfg := configurePodResourcesInKubelet(f, true, reservedSystemCPUs)
defer func() {
// restore kubelet config
setOldKubeletConfig(f, oldCfg)
// Delete state file to allow repeated runs
deleteStateFile()
}()
endpoint, err := util.LocalEndpoint(defaultPodResourcesPath, podresources.Socket)
framework.ExpectNoError(err)
cli, conn, err := podresources.GetV1Client(endpoint, defaultPodResourcesTimeout, defaultPodResourcesMaxSize)
framework.ExpectNoError(err)
defer conn.Close()
podresourcesListTests(f, cli, nil)
podresourcesGetAllocatableResourcesTests(f, cli, nil, onlineCPUs, reservedSystemCPUs)
})
ginkgo.It("should return the expected responses with cpumanager none policy", func() {
// current default is "none" policy - no need to restart the kubelet
if sriovdevCount, err := countSRIOVDevices(); err != nil || sriovdevCount > 0 {
e2eskipper.Skipf("this test is meant to run on a system with no configured VF from SRIOV device")
}
oldCfg := enablePodResourcesFeatureGateInKubelet(f)
defer func() {
// restore kubelet config
setOldKubeletConfig(f, oldCfg)
// Delete state file to allow repeated runs
deleteStateFile()
}()
endpoint, err := util.LocalEndpoint(defaultPodResourcesPath, podresources.Socket)
framework.ExpectNoError(err)
cli, conn, err := podresources.GetV1Client(endpoint, defaultPodResourcesTimeout, defaultPodResourcesMaxSize)
framework.ExpectNoError(err)
defer conn.Close()
// intentionally passing empty cpuset instead of onlineCPUs because with none policy
// we should get no allocatable cpus - no exclusively allocatable CPUs, depends on policy static
podresourcesGetAllocatableResourcesTests(f, cli, nil, cpuset.CPUSet{}, cpuset.CPUSet{})
})
ginkgo.It("should return the expected error with the feature gate disabled", func() {
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.KubeletPodResourcesGetAllocatable) {
e2eskipper.Skipf("this test is meant to run with the POD Resources Extensions feature gate disabled")
}
endpoint, err := util.LocalEndpoint(defaultPodResourcesPath, podresources.Socket)
framework.ExpectNoError(err)
cli, conn, err := podresources.GetV1Client(endpoint, defaultPodResourcesTimeout, defaultPodResourcesMaxSize)
framework.ExpectNoError(err)
defer conn.Close()
ginkgo.By("checking GetAllocatableResources fail if the feature gate is not enabled")
_, err = cli.GetAllocatableResources(context.TODO(), &kubeletpodresourcesv1.AllocatableResourcesRequest{})
framework.ExpectError(err, "With feature gate disabled, the call must fail")
})
})
})
func getOnlineCPUs() (cpuset.CPUSet, error) {
onlineCPUList, err := ioutil.ReadFile("/sys/devices/system/cpu/online")
if err != nil {
return cpuset.CPUSet{}, err
}
return cpuset.Parse(strings.TrimSpace(string(onlineCPUList)))
}
func configurePodResourcesInKubelet(f *framework.Framework, cleanStateFile bool, reservedSystemCPUs cpuset.CPUSet) (oldCfg *kubeletconfig.KubeletConfiguration) {
// we also need CPUManager with static policy to be able to do meaningful testing
oldCfg, err := getCurrentKubeletConfig()
framework.ExpectNoError(err)
newCfg := oldCfg.DeepCopy()
if newCfg.FeatureGates == nil {
newCfg.FeatureGates = make(map[string]bool)
}
newCfg.FeatureGates["CPUManager"] = true
newCfg.FeatureGates["KubeletPodResourcesGetAllocatable"] = true
// After graduation of the CPU Manager feature to Beta, the CPU Manager
// "none" policy is ON by default. But when we set the CPU Manager policy to
// "static" in this test and the Kubelet is restarted so that "static"
// policy can take effect, there will always be a conflict with the state
// checkpointed in the disk (i.e., the policy checkpointed in the disk will
// be "none" whereas we are trying to restart Kubelet with "static"
// policy). Therefore, we delete the state file so that we can proceed
// with the tests.
// Only delete the state file at the begin of the tests.
if cleanStateFile {
deleteStateFile()
}
// Set the CPU Manager policy to static.
newCfg.CPUManagerPolicy = string(cpumanager.PolicyStatic)
// Set the CPU Manager reconcile period to 1 second.
newCfg.CPUManagerReconcilePeriod = metav1.Duration{Duration: 1 * time.Second}
if reservedSystemCPUs.Size() > 0 {
cpus := reservedSystemCPUs.String()
framework.Logf("configurePodResourcesInKubelet: using reservedSystemCPUs=%q", cpus)
newCfg.ReservedSystemCPUs = cpus
} else {
// The Kubelet panics if either kube-reserved or system-reserved is not set
// when CPU Manager is enabled. Set cpu in kube-reserved > 0 so that
// kubelet doesn't panic.
if newCfg.KubeReserved == nil {
newCfg.KubeReserved = map[string]string{}
}
if _, ok := newCfg.KubeReserved["cpu"]; !ok {
newCfg.KubeReserved["cpu"] = "200m"
}
}
// Update the Kubelet configuration.
framework.ExpectNoError(setKubeletConfiguration(f, newCfg))
// Wait for the Kubelet to be ready.
gomega.Eventually(func() bool {
nodes, err := e2enode.TotalReady(f.ClientSet)
framework.ExpectNoError(err)
return nodes == 1
}, time.Minute, time.Second).Should(gomega.BeTrue())
return oldCfg
}
func enablePodResourcesFeatureGateInKubelet(f *framework.Framework) (oldCfg *kubeletconfig.KubeletConfiguration) {
oldCfg, err := getCurrentKubeletConfig()
framework.ExpectNoError(err)
newCfg := oldCfg.DeepCopy()
if newCfg.FeatureGates == nil {
newCfg.FeatureGates = make(map[string]bool)
}
newCfg.FeatureGates["KubeletPodResourcesGetAllocatable"] = true
// Update the Kubelet configuration.
framework.ExpectNoError(setKubeletConfiguration(f, newCfg))
// Wait for the Kubelet to be ready.
gomega.Eventually(func() bool {
nodes, err := e2enode.TotalReady(f.ClientSet)
framework.ExpectNoError(err)
return nodes == 1
}, time.Minute, time.Second).Should(gomega.BeTrue())
return oldCfg
}

View File

@ -89,13 +89,17 @@ func detectCoresPerSocket() int {
return coreCount
}
func detectSRIOVDevices() int {
func countSRIOVDevices() (int, error) {
outData, err := exec.Command("/bin/sh", "-c", "ls /sys/bus/pci/devices/*/physfn | wc -w").Output()
framework.ExpectNoError(err)
if err != nil {
return -1, err
}
return strconv.Atoi(strings.TrimSpace(string(outData)))
}
devCount, err := strconv.Atoi(strings.TrimSpace(string(outData)))
func detectSRIOVDevices() int {
devCount, err := countSRIOVDevices()
framework.ExpectNoError(err)
return devCount
}
@ -387,6 +391,11 @@ func runTopologyManagerPolicySuiteTests(f *framework.Framework) {
runMultipleGuPods(f)
}
// waitForAllContainerRemoval waits until all the containers on a given pod are really gone.
// This is needed by the e2e tests which involve exclusive resource allocation (cpu, topology manager; podresources; etc.)
// In these cases, we need to make sure the tests clean up after themselves to make sure each test runs in
// a pristine environment. The only way known so far to do that is to introduce this wait.
// Worth noting, however, that this makes the test runtime much bigger.
func waitForAllContainerRemoval(podName, podNS string) {
rs, _, err := getCRIClient()
framework.ExpectNoError(err)
@ -434,7 +443,7 @@ func runTopologyManagerPositiveTest(f *framework.Framework, numPods int, ctnAttr
pod := pods[podID]
framework.Logf("deleting the pod %s/%s and waiting for container removal",
pod.Namespace, pod.Name)
deletePods(f, []string{pod.Name})
deletePodSyncByName(f, pod.Name)
waitForAllContainerRemoval(pod.Name, pod.Namespace)
}
}
@ -462,7 +471,7 @@ func runTopologyManagerNegativeTest(f *framework.Framework, ctnAttrs, initCtnAtt
framework.Failf("pod %s failed for wrong reason: %q", pod.Name, pod.Status.Reason)
}
deletePods(f, []string{pod.Name})
deletePodSyncByName(f, pod.Name)
}
func isTopologyAffinityError(pod *v1.Pod) bool {
@ -565,7 +574,7 @@ func teardownSRIOVConfigOrFail(f *framework.Framework, sd *sriovData) {
ginkgo.By(fmt.Sprintf("Delete SRIOV device plugin pod %s/%s", sd.pod.Namespace, sd.pod.Name))
err = f.ClientSet.CoreV1().Pods(sd.pod.Namespace).Delete(context.TODO(), sd.pod.Name, deleteOptions)
framework.ExpectNoError(err)
waitForContainerRemoval(sd.pod.Spec.Containers[0].Name, sd.pod.Name, sd.pod.Namespace)
waitForAllContainerRemoval(sd.pod.Name, sd.pod.Namespace)
ginkgo.By(fmt.Sprintf("Deleting configMap %v/%v", metav1.NamespaceSystem, sd.configMap.Name))
err = f.ClientSet.CoreV1().ConfigMaps(metav1.NamespaceSystem).Delete(context.TODO(), sd.configMap.Name, deleteOptions)