Implement all necessary methods to provide memory manager data under pod resources metrics

Signed-off-by: Artyom Lukianov <alukiano@redhat.com>
This commit is contained in:
Artyom Lukianov 2021-04-22 18:13:46 +03:00
parent 24023f9fcc
commit 03830db82d
16 changed files with 239 additions and 10 deletions

View File

@ -32,15 +32,17 @@ type v1PodResourcesServer struct {
podsProvider PodsProvider
devicesProvider DevicesProvider
cpusProvider CPUsProvider
memoryProvider MemoryProvider
}
// NewV1PodResourcesServer returns a PodResourcesListerServer which lists pods provided by the PodsProvider
// with device information provided by the DevicesProvider
func NewV1PodResourcesServer(podsProvider PodsProvider, devicesProvider DevicesProvider, cpusProvider CPUsProvider) v1.PodResourcesListerServer {
func NewV1PodResourcesServer(podsProvider PodsProvider, devicesProvider DevicesProvider, cpusProvider CPUsProvider, memoryProvider MemoryProvider) v1.PodResourcesListerServer {
return &v1PodResourcesServer{
podsProvider: podsProvider,
devicesProvider: devicesProvider,
cpusProvider: cpusProvider,
memoryProvider: memoryProvider,
}
}
@ -65,6 +67,7 @@ func (p *v1PodResourcesServer) List(ctx context.Context, req *v1.ListPodResource
Name: container.Name,
Devices: p.devicesProvider.GetDevices(string(pod.UID), container.Name),
CpuIds: p.cpusProvider.GetCPUs(string(pod.UID), container.Name),
Memory: p.memoryProvider.GetMemory(string(pod.UID), container.Name),
}
}
podResources[i] = &pRes
@ -90,5 +93,6 @@ func (p *v1PodResourcesServer) GetAllocatableResources(ctx context.Context, req
return &v1.AllocatableResourcesResponse{
Devices: p.devicesProvider.GetAllocatableDevices(),
CpuIds: p.cpusProvider.GetAllocatableCPUs(),
Memory: p.memoryProvider.GetAllocatableMemory(),
}, nil
}

View File

@ -31,6 +31,7 @@ import (
pkgfeatures "k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
"k8s.io/kubernetes/pkg/kubelet/cm/devicemanager"
"k8s.io/kubernetes/pkg/kubelet/cm/memorymanager/state"
)
func TestListPodResourcesV1(t *testing.T) {
@ -50,11 +51,25 @@ func TestListPodResourcesV1(t *testing.T) {
cpus := []int64{12, 23, 30}
memory := []*podresourcesapi.ContainerMemory{
{
MemoryType: "memory",
Size_: 1073741824,
Topology: &podresourcesapi.TopologyInfo{Nodes: []*podresourcesapi.NUMANode{{ID: numaID}}},
},
{
MemoryType: "hugepages-1Gi",
Size_: 1073741824,
Topology: &podresourcesapi.TopologyInfo{Nodes: []*podresourcesapi.NUMANode{{ID: numaID}}},
},
}
for _, tc := range []struct {
desc string
pods []*v1.Pod
devices []*podresourcesapi.ContainerDevices
cpus []int64
memory []*podresourcesapi.ContainerMemory
expectedResponse *podresourcesapi.ListPodResourcesResponse
}{
{
@ -62,6 +77,7 @@ func TestListPodResourcesV1(t *testing.T) {
pods: []*v1.Pod{},
devices: []*podresourcesapi.ContainerDevices{},
cpus: []int64{},
memory: []*podresourcesapi.ContainerMemory{},
expectedResponse: &podresourcesapi.ListPodResourcesResponse{},
},
{
@ -84,6 +100,7 @@ func TestListPodResourcesV1(t *testing.T) {
},
devices: []*podresourcesapi.ContainerDevices{},
cpus: []int64{},
memory: []*podresourcesapi.ContainerMemory{},
expectedResponse: &podresourcesapi.ListPodResourcesResponse{
PodResources: []*podresourcesapi.PodResources{
{
@ -119,6 +136,7 @@ func TestListPodResourcesV1(t *testing.T) {
},
devices: devs,
cpus: cpus,
memory: memory,
expectedResponse: &podresourcesapi.ListPodResourcesResponse{
PodResources: []*podresourcesapi.PodResources{
{
@ -129,6 +147,7 @@ func TestListPodResourcesV1(t *testing.T) {
Name: containerName,
Devices: devs,
CpuIds: cpus,
Memory: memory,
},
},
},
@ -141,10 +160,12 @@ func TestListPodResourcesV1(t *testing.T) {
m.On("GetPods").Return(tc.pods)
m.On("GetDevices", string(podUID), containerName).Return(tc.devices)
m.On("GetCPUs", string(podUID), containerName).Return(tc.cpus)
m.On("GetMemory", string(podUID), containerName).Return(tc.memory)
m.On("UpdateAllocatedDevices").Return()
m.On("GetAllocatableCPUs").Return(cpuset.CPUSet{})
m.On("GetAllocatableDevices").Return(devicemanager.NewResourceDeviceInstances())
server := NewV1PodResourcesServer(m, m, m)
m.On("GetAllocatableMemory").Return([]state.Block{})
server := NewV1PodResourcesServer(m, m, m, m)
resp, err := server.List(context.TODO(), &podresourcesapi.ListPodResourcesRequest{})
if err != nil {
t.Errorf("want err = %v, got %q", nil, err)
@ -215,16 +236,65 @@ func TestAllocatableResources(t *testing.T) {
allCPUs := []int64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}
allMemory := []*podresourcesapi.ContainerMemory{
{
MemoryType: "memory",
Size_: 5368709120,
Topology: &podresourcesapi.TopologyInfo{
Nodes: []*podresourcesapi.NUMANode{
{
ID: 0,
},
},
},
},
{
MemoryType: "hugepages-2Mi",
Size_: 1073741824,
Topology: &podresourcesapi.TopologyInfo{
Nodes: []*podresourcesapi.NUMANode{
{
ID: 0,
},
},
},
},
{
MemoryType: "memory",
Size_: 5368709120,
Topology: &podresourcesapi.TopologyInfo{
Nodes: []*podresourcesapi.NUMANode{
{
ID: 1,
},
},
},
},
{
MemoryType: "hugepages-2Mi",
Size_: 1073741824,
Topology: &podresourcesapi.TopologyInfo{
Nodes: []*podresourcesapi.NUMANode{
{
ID: 1,
},
},
},
},
}
for _, tc := range []struct {
desc string
allCPUs []int64
allDevices []*podresourcesapi.ContainerDevices
allMemory []*podresourcesapi.ContainerMemory
expectedAllocatableResourcesResponse *podresourcesapi.AllocatableResourcesResponse
}{
{
desc: "no devices, no CPUs",
allCPUs: []int64{},
allDevices: []*podresourcesapi.ContainerDevices{},
allMemory: []*podresourcesapi.ContainerMemory{},
expectedAllocatableResourcesResponse: &podresourcesapi.AllocatableResourcesResponse{},
},
{
@ -235,6 +305,14 @@ func TestAllocatableResources(t *testing.T) {
CpuIds: allCPUs,
},
},
{
desc: "no devices, no CPUs, all memory",
allCPUs: []int64{},
allDevices: []*podresourcesapi.ContainerDevices{},
expectedAllocatableResourcesResponse: &podresourcesapi.AllocatableResourcesResponse{
Memory: allMemory,
},
},
{
desc: "with devices, all CPUs",
allCPUs: allCPUs,
@ -361,10 +439,12 @@ func TestAllocatableResources(t *testing.T) {
m := new(mockProvider)
m.On("GetDevices", "", "").Return([]*podresourcesapi.ContainerDevices{})
m.On("GetCPUs", "", "").Return([]int64{})
m.On("GetMemory", "", "").Return([]*podresourcesapi.ContainerMemory{})
m.On("UpdateAllocatedDevices").Return()
m.On("GetAllocatableDevices").Return(tc.allDevices)
m.On("GetAllocatableCPUs").Return(tc.allCPUs)
server := NewV1PodResourcesServer(m, m, m)
m.On("GetAllocatableMemory").Return(tc.allMemory)
server := NewV1PodResourcesServer(m, m, m, m)
resp, err := server.GetAllocatableResources(context.TODO(), &podresourcesapi.AllocatableResourcesRequest{})
if err != nil {

View File

@ -48,6 +48,11 @@ func (m *mockProvider) GetCPUs(podUID, containerName string) []int64 {
return args.Get(0).([]int64)
}
func (m *mockProvider) GetMemory(podUID, containerName string) []*podresourcesv1.ContainerMemory {
args := m.Called(podUID, containerName)
return args.Get(0).([]*podresourcesv1.ContainerMemory)
}
func (m *mockProvider) UpdateAllocatedDevices() {
m.Called()
}
@ -62,6 +67,11 @@ func (m *mockProvider) GetAllocatableCPUs() []int64 {
return args.Get(0).([]int64)
}
func (m *mockProvider) GetAllocatableMemory() []*podresourcesv1.ContainerMemory {
args := m.Called()
return args.Get(0).([]*podresourcesv1.ContainerMemory)
}
func TestListPodResourcesV1alpha1(t *testing.T) {
podName := "pod-name"
podNamespace := "pod-namespace"

View File

@ -112,9 +112,10 @@ type ContainerManager interface {
// GetAllocateResourcesPodAdmitHandler returns an instance of a PodAdmitHandler responsible for allocating pod resources.
GetAllocateResourcesPodAdmitHandler() lifecycle.PodAdmitHandler
// Implements the podresources Provider API for CPUs and Devices
// Implements the podresources Provider API for CPUs, Memory and Devices
podresources.CPUsProvider
podresources.DevicesProvider
podresources.MemoryProvider
}
type NodeConfig struct {

View File

@ -55,6 +55,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager"
"k8s.io/kubernetes/pkg/kubelet/cm/devicemanager"
"k8s.io/kubernetes/pkg/kubelet/cm/memorymanager"
memorymanagerstate "k8s.io/kubernetes/pkg/kubelet/cm/memorymanager/state"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
cmutil "k8s.io/kubernetes/pkg/kubelet/cm/util"
"k8s.io/kubernetes/pkg/kubelet/config"
@ -1086,6 +1087,22 @@ func (cm *containerManagerImpl) GetAllocatableCPUs() []int64 {
return []int64{}
}
func (cm *containerManagerImpl) GetMemory(podUID, containerName string) []*podresourcesapi.ContainerMemory {
if cm.memoryManager == nil {
return []*podresourcesapi.ContainerMemory{}
}
return containerMemoryFromBlock(cm.memoryManager.GetMemory(podUID, containerName))
}
func (cm *containerManagerImpl) GetAllocatableMemory() []*podresourcesapi.ContainerMemory {
if cm.memoryManager == nil {
return []*podresourcesapi.ContainerMemory{}
}
return containerMemoryFromBlock(cm.memoryManager.GetAllocatableMemory())
}
func (cm *containerManagerImpl) ShouldResetExtendedResourceCapacity() bool {
return cm.deviceManager.ShouldResetExtendedResourceCapacity()
}
@ -1093,3 +1110,25 @@ func (cm *containerManagerImpl) ShouldResetExtendedResourceCapacity() bool {
func (cm *containerManagerImpl) UpdateAllocatedDevices() {
cm.deviceManager.UpdateAllocatedDevices()
}
func containerMemoryFromBlock(blocks []memorymanagerstate.Block) []*podresourcesapi.ContainerMemory {
var containerMemories []*podresourcesapi.ContainerMemory
for _, b := range blocks {
containerMemory := podresourcesapi.ContainerMemory{
MemoryType: string(b.Type),
Size_: b.Size,
Topology: &podresourcesapi.TopologyInfo{
Nodes: []*podresourcesapi.NUMANode{},
},
}
for _, numaNodeID := range b.NUMAAffinity {
containerMemory.Topology.Nodes = append(containerMemory.Topology.Nodes, &podresourcesapi.NUMANode{ID: int64(numaNodeID)})
}
containerMemories = append(containerMemories, &containerMemory)
}
return containerMemories
}

View File

@ -139,6 +139,14 @@ func (cm *containerManagerStub) GetAllocatableCPUs() []int64 {
return nil
}
func (cm *containerManagerStub) GetMemory(_, _ string) []*podresourcesapi.ContainerMemory {
return nil
}
func (cm *containerManagerStub) GetAllocatableMemory() []*podresourcesapi.ContainerMemory {
return nil
}
func NewStubContainerManager() ContainerManager {
return &containerManagerStub{shouldResetExtendedResourceCapacity: false}
}

View File

@ -243,3 +243,11 @@ func (cm *containerManagerImpl) GetCPUs(_, _ string) []int64 {
func (cm *containerManagerImpl) GetAllocatableCPUs() []int64 {
return nil
}
func (cm *containerManagerImpl) GetMemory(_, _ string) []*podresourcesapi.ContainerMemory {
return nil
}
func (cm *containerManagerImpl) GetAllocatableMemory() []*podresourcesapi.ContainerMemory {
return nil
}

View File

@ -214,3 +214,16 @@ func (cm *FakeContainerManager) GetAllocatableCPUs() []int64 {
defer cm.Unlock()
return nil
}
func (cm *FakeContainerManager) GetMemory(_, _ string) []*podresourcesapi.ContainerMemory {
cm.Lock()
defer cm.Unlock()
cm.CalledFunctions = append(cm.CalledFunctions, "GetMemory")
return nil
}
func (cm *FakeContainerManager) GetAllocatableMemory() []*podresourcesapi.ContainerMemory {
cm.Lock()
defer cm.Unlock()
return nil
}

View File

@ -61,12 +61,12 @@ func (m *fakeManager) RemoveContainer(containerID string) error {
}
func (m *fakeManager) GetTopologyHints(pod *v1.Pod, container *v1.Container) map[string][]topologymanager.TopologyHint {
klog.InfoS("Get Topology Hints")
klog.InfoS("Get Topology Hints", "pod", klog.KObj(pod), "containerName", container.Name)
return map[string][]topologymanager.TopologyHint{}
}
func (m *fakeManager) GetPodTopologyHints(pod *v1.Pod) map[string][]topologymanager.TopologyHint {
klog.InfoS("Get Pod Topology Hints")
klog.InfoS("Get Pod Topology Hints", "pod", klog.KObj(pod))
return map[string][]topologymanager.TopologyHint{}
}
@ -74,6 +74,18 @@ func (m *fakeManager) State() state.Reader {
return m.state
}
// GetAllocatableMemory returns the amount of allocatable memory for each NUMA node
func (m *fakeManager) GetAllocatableMemory() []state.Block {
klog.InfoS("Get Allocatable Memory")
return []state.Block{}
}
// GetMemory returns the memory allocated by a container from NUMA nodes
func (m *fakeManager) GetMemory(podUID, containerName string) []state.Block {
klog.InfoS("Get Memory", "podUID", podUID, "containerName", containerName)
return []state.Block{}
}
// NewFakeManager creates empty/fake memory manager
func NewFakeManager() Manager {
return &fakeManager{

View File

@ -83,6 +83,12 @@ type Manager interface {
// GetMemoryNUMANodes provides NUMA nodes that are used to allocate the container memory
GetMemoryNUMANodes(pod *v1.Pod, container *v1.Container) sets.Int
// GetAllocatableMemory returns the amount of allocatable memory for each NUMA node
GetAllocatableMemory() []state.Block
// GetMemory returns the memory allocated by a container from NUMA nodes
GetMemory(podUID, containerName string) []state.Block
}
type manager struct {
@ -115,6 +121,9 @@ type manager struct {
// stateFileDirectory holds the directory where the state file for checkpoints is held.
stateFileDirectory string
// allocatableMemory holds the allocatable memory for each NUMA node
allocatableMemory []state.Block
}
var _ Manager = &manager{}
@ -173,6 +182,8 @@ func (m *manager) Start(activePods ActivePodsFunc, sourcesReady config.SourcesRe
return err
}
m.allocatableMemory = m.policy.GetAllocatableMemory(m.state)
return nil
}
@ -184,7 +195,7 @@ func (m *manager) AddContainer(pod *v1.Pod, container *v1.Container, containerID
m.containerMap.Add(string(pod.UID), container.Name, containerID)
}
// GetMemory provides NUMA nodes that used to allocate the container memory
// GetMemoryNUMANodes provides NUMA nodes that used to allocate the container memory
func (m *manager) GetMemoryNUMANodes(pod *v1.Pod, container *v1.Container) sets.Int {
// Get NUMA node affinity of blocks assigned to the container during Allocate()
numaNodes := sets.NewInt()
@ -407,3 +418,13 @@ func getSystemReservedMemory(machineInfo *cadvisorapi.MachineInfo, nodeAllocatab
return reservedMemoryConverted, nil
}
// GetAllocatableMemory returns the amount of allocatable memory for each NUMA node
func (m *manager) GetAllocatableMemory() []state.Block {
return m.allocatableMemory
}
// GetMemory returns the memory allocated by a container from NUMA nodes
func (m *manager) GetMemory(podUID, containerName string) []state.Block {
return m.state.GetMemoryBlocks(podUID, containerName)
}

View File

@ -112,6 +112,11 @@ func (p *mockPolicy) GetPodTopologyHints(s state.State, pod *v1.Pod) map[string]
return nil
}
// GetAllocatableMemory returns the amount of allocatable memory for each NUMA node
func (p *mockPolicy) GetAllocatableMemory(s state.State) []state.Block {
return []state.Block{}
}
type mockRuntimeService struct {
err error
}

View File

@ -41,4 +41,6 @@ type Policy interface {
// and is consulted to achieve NUMA aware resource alignment among this
// and other resource controllers.
GetPodTopologyHints(s state.State, pod *v1.Pod) map[string][]topologymanager.TopologyHint
// GetAllocatableMemory returns the amount of allocatable memory for each NUMA node
GetAllocatableMemory(s state.State) []state.Block
}

View File

@ -66,3 +66,8 @@ func (p *none) GetTopologyHints(s state.State, pod *v1.Pod, container *v1.Contai
func (p *none) GetPodTopologyHints(s state.State, pod *v1.Pod) map[string][]topologymanager.TopologyHint {
return nil
}
// GetAllocatableMemory returns the amount of allocatable memory for each NUMA node
func (p *none) GetAllocatableMemory(s state.State) []state.Block {
return []state.Block{}
}

View File

@ -767,3 +767,24 @@ func findBestHint(hints []topologymanager.TopologyHint) *topologymanager.Topolog
}
return &bestHint
}
// GetAllocatableMemory returns the amount of allocatable memory for each NUMA node
func (p *staticPolicy) GetAllocatableMemory(s state.State) []state.Block {
var allocatableMemory []state.Block
machineState := s.GetMachineState()
for numaNodeID, numaNodeState := range machineState {
for resourceName, memoryTable := range numaNodeState.MemoryMap {
if memoryTable.Allocatable == 0 {
continue
}
block := state.Block{
NUMAAffinity: []int{numaNodeID},
Type: resourceName,
Size: memoryTable.Allocatable,
}
allocatableMemory = append(allocatableMemory, block)
}
}
return allocatableMemory
}

View File

@ -2277,7 +2277,7 @@ func (kl *Kubelet) ListenAndServePodResources() {
klog.V(2).InfoS("Failed to get local endpoint for PodResources endpoint", "err", err)
return
}
server.ListenAndServePodResources(socket, kl.podManager, kl.containerManager, kl.containerManager)
server.ListenAndServePodResources(socket, kl.podManager, kl.containerManager, kl.containerManager, kl.containerManager)
}
// Delete the eligible dead container instances in a pod. Depending on the configuration, the latest dead containers may be kept around.

View File

@ -188,10 +188,10 @@ func ListenAndServeKubeletReadOnlyServer(host HostInterface, resourceAnalyzer st
}
// ListenAndServePodResources initializes a gRPC server to serve the PodResources service
func ListenAndServePodResources(socket string, podsProvider podresources.PodsProvider, devicesProvider podresources.DevicesProvider, cpusProvider podresources.CPUsProvider) {
func ListenAndServePodResources(socket string, podsProvider podresources.PodsProvider, devicesProvider podresources.DevicesProvider, cpusProvider podresources.CPUsProvider, memoryProvider podresources.MemoryProvider) {
server := grpc.NewServer()
podresourcesapiv1alpha1.RegisterPodResourcesListerServer(server, podresources.NewV1alpha1PodResourcesServer(podsProvider, devicesProvider))
podresourcesapi.RegisterPodResourcesListerServer(server, podresources.NewV1PodResourcesServer(podsProvider, devicesProvider, cpusProvider))
podresourcesapi.RegisterPodResourcesListerServer(server, podresources.NewV1PodResourcesServer(podsProvider, devicesProvider, cpusProvider, memoryProvider))
l, err := util.CreateListener(socket)
if err != nil {
klog.ErrorS(err, "Failed to create listener for podResources endpoint")